You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2020/05/01 08:59:27 UTC
[ignite] branch master updated: IGNITE-11073: Cluster snapshots of
persisted caches (#7760)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5ad8944 IGNITE-11073: Cluster snapshots of persisted caches (#7760)
5ad8944 is described below
commit 5ad8944992e49788828e915a8d068f3706616f9a
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Fri May 1 11:58:58 2020 +0300
IGNITE-11073: Cluster snapshots of persisted caches (#7760)
---
...IgnitePersistenceCompatibilityAbstractTest.java | 2 +-
.../src/main/java/org/apache/ignite/Ignite.java | 5 +
.../java/org/apache/ignite/IgniteSnapshot.java | 42 +
.../ignite/configuration/IgniteConfiguration.java | 31 +
.../org/apache/ignite/internal/IgniteFeatures.java | 3 +
.../org/apache/ignite/internal/IgniteKernal.java | 11 +-
.../ignite/internal/MarshallerContextImpl.java | 97 +-
.../internal/MarshallerMappingFileStore.java | 29 +-
.../internal/managers/IgniteMBeansManager.java | 6 +
.../managers/discovery/GridDiscoveryManager.java | 34 +-
.../ignite/internal/pagemem/store/PageStore.java | 16 +-
.../internal/pagemem/store/PageWriteListener.java | 35 +
.../processors/cache/ClusterCachesInfo.java | 22 +-
.../internal/processors/cache/ExchangeContext.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 3 +
.../processors/cache/GridCacheSharedContext.java | 17 +
.../cache/binary/BinaryMetadataFileStore.java | 33 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 55 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 32 +-
.../cache/persistence/DbCheckpointListener.java | 6 +
.../GridCacheDatabaseSharedManager.java | 26 +-
.../cache/persistence/file/FilePageStore.java | 57 +-
.../persistence/file/FilePageStoreManager.java | 269 +++--
.../persistence/partstate/GroupPartitionId.java | 2 +-
.../snapshot/IgniteCacheSnapshotManager.java | 4 +-
.../snapshot/IgniteSnapshotManager.java | 1233 ++++++++++++++++++++
.../persistence/snapshot/SnapshotFutureTask.java | 1010 ++++++++++++++++
.../persistence/snapshot/SnapshotMXBeanImpl.java | 41 +
.../cache/persistence/snapshot/SnapshotSender.java | 234 ++++
.../wal/reader/IgniteWalIteratorFactory.java | 2 +-
.../wal/reader/StandaloneGridKernalContext.java | 5 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 8 +-
.../marshaller/GridMarshallerMappingProcessor.java | 15 +-
.../apache/ignite/internal/util/IgniteUtils.java | 14 +-
.../util/distributed/DistributedProcess.java | 50 +-
.../internal/util/distributed/InitMessage.java | 2 +-
.../org/apache/ignite/mxbean/SnapshotMXBean.java | 35 +
.../GridCacheRebalancingSyncSelfTest.java | 2 +-
.../IgnitePdsBinaryMetadataAsyncWritingTest.java | 5 +-
...gnitePdsBinaryMetadataOnClusterRestartTest.java | 6 +-
.../IgnitePdsNoSpaceLeftOnDeviceTest.java | 5 +-
.../db/wal/IgniteWalIteratorSwitchSegmentTest.java | 2 +
.../pagemem/BPlusTreePageMemoryImplTest.java | 1 +
.../BPlusTreeReuseListPageMemoryImplTest.java | 1 +
.../pagemem/IndexStoragePageMemoryImplTest.java | 1 +
.../pagemem/PageMemoryImplNoLoadTest.java | 1 +
.../persistence/pagemem/PageMemoryImplTest.java | 1 +
.../snapshot/AbstractSnapshotSelfTest.java | 513 ++++++++
.../snapshot/IgniteClusterSnapshotSelfTest.java | 945 +++++++++++++++
.../snapshot/IgniteSnapshotMXBeanTest.java | 80 ++
.../snapshot/IgniteSnapshotManagerSelfTest.java | 439 +++++++
.../internal/processors/igfs/IgfsIgniteMock.java | 8 +
.../loadtests/hashmap/GridCacheTestContext.java | 1 +
.../ignite/platform/PlatformDeployServiceTask.java | 11 +-
.../apache/ignite/testframework/GridTestUtils.java | 17 +-
.../testframework/junits/GridAbstractTest.java | 5 +-
.../ignite/testframework/junits/IgniteMock.java | 6 +
.../junits/common/GridCommonAbstractTest.java | 7 +-
.../junits/multijvm/IgniteProcessProxy.java | 6 +
.../IgniteBasicWithPersistenceTestSuite.java | 9 +-
.../cache/metric/SqlViewExporterSpiTest.java | 4 +-
.../IgniteClusterSnapshotWithIndexesTest.java | 274 +++++
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
.../java/org/apache/ignite/IgniteSpringBean.java | 5 +
64 files changed, 5613 insertions(+), 238 deletions(-)
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
index b08f6f7..3aa75a5 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
@@ -24,8 +24,8 @@ import org.apache.ignite.compatibility.testframework.junits.IgniteCompatibilityA
import org.apache.ignite.compatibility.testframework.util.CompatibilityTestsUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.BINARY_META_FOLDER;
/**
* Super class for all persistence compatibility tests.
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 14b5797..b1e3b54 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -733,4 +733,9 @@ public interface Ignite extends AutoCloseable {
* @return Instance of {@link IgniteEncryption} interface.
*/
public IgniteEncryption encryption();
+
+ /**
+ * @return Snapshot manager.
+ */
+ public IgniteSnapshot snapshot();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
new file mode 100644
index 0000000..753d427
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * This interface provides functionality for creating cluster-wide cache data snapshots.
+ * <p>
+ * Current limitations:
+ * <ul>
+ * <li>Snapshot will trigger PME (partition map exchange) to run itself.</li>
+ * <li>Snapshot will be taken from all registered persistence caches to
+ * grantee data consistency between them.</li>
+ * <li>Snapshot must be resorted manually on the switched off cluster by copying data
+ * to the working directory on each cluster node.</li>
+ * </ul>
+ */
+public interface IgniteSnapshot {
+ /**
+ * Create a consistent copy of all persistence cache groups from the whole cluster.
+ *
+ * @param name Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
+ * @return Future which will be completed when a process ends.
+ */
+ public IgniteFuture<Void> createSnapshot(String name);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2ba4e29..313d462 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -221,6 +221,9 @@ public class IgniteConfiguration {
/** Default value for cache sanity check enabled flag. */
public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
+ /** Default relative working directory path for snapshot operation result. */
+ public static final String DFLT_SNAPSHOT_DIRECTORY = "snapshots";
+
/** Default value for late affinity assignment flag. */
@Deprecated
public static final boolean DFLT_LATE_AFF_ASSIGNMENT = true;
@@ -552,6 +555,13 @@ public class IgniteConfiguration {
/** Page memory configuration. */
private DataStorageConfiguration dsCfg;
+ /**
+ * Directory where will be stored all results of snapshot operations. The internal
+ * {@link U#resolveWorkDirectory(String, String, boolean)} is used to configure
+ * snapshot working directory.
+ */
+ private String snapshotPath = DFLT_SNAPSHOT_DIRECTORY;
+
/** Active on start flag. */
@Deprecated
private boolean activeOnStart = DFLT_ACTIVE_ON_START;
@@ -711,6 +721,7 @@ public class IgniteConfiguration {
segPlc = cfg.getSegmentationPolicy();
segResolveAttempts = cfg.getSegmentationResolveAttempts();
segResolvers = cfg.getSegmentationResolvers();
+ snapshotPath = cfg.getSnapshotPath();
sndRetryCnt = cfg.getNetworkSendRetryCount();
sndRetryDelay = cfg.getNetworkSendRetryDelay();
sqlConnCfg = cfg.getSqlConnectorConfiguration();
@@ -3153,6 +3164,26 @@ public class IgniteConfiguration {
}
/**
+ * @return By default the relative {@link #DFLT_SNAPSHOT_DIRECTORY} is used. The value can be
+ * configured as relative path starting from the Ignites {@link #getWorkDirectory()} or
+ * the value can be represented as an absolute snapshot working path.
+ */
+ public String getSnapshotPath() {
+ return snapshotPath;
+ }
+
+ /**
+ * @param snapshotPath By default the relative {@link #DFLT_SNAPSHOT_DIRECTORY} is used.
+ * The value can be configured as relative path starting from the Ignites {@link #getWorkDirectory()}
+ * or the value can be represented as an absolute snapshot working path instead.
+ */
+ public IgniteConfiguration setSnapshotPath(String snapshotPath) {
+ this.snapshotPath = snapshotPath;
+
+ return this;
+ }
+
+ /**
* Gets grid warmup closure. This closure will be executed before actual grid instance start. Configuration of
* a starting instance will be passed to the closure so it can decide what operations to warm up.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index a5f7b4f..0bba3f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -95,6 +95,9 @@ public enum IgniteFeatures {
*/
SAFE_CLUSTER_DEACTIVATION(22),
+ /** Persistence caches can be snapshot. */
+ PERSISTENCE_CACHE_SNAPSHOT(23),
+
/** Long operations dump timeout. */
LONG_OPERATIONS_DUMP_TIMEOUT(30);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 3685d71..7138214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -84,6 +84,7 @@ import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
@@ -246,7 +247,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION
import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
-import static org.apache.ignite.IgniteSystemProperties.snapshot;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
import static org.apache.ignite.internal.GridKernalState.STARTED;
import static org.apache.ignite.internal.GridKernalState.STARTING;
@@ -1820,7 +1820,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
// Stick all system properties into node's attributes overwriting any
// identical names from environment properties.
- for (Map.Entry<Object, Object> e : snapshot().entrySet()) {
+ for (Map.Entry<Object, Object> e : IgniteSystemProperties.snapshot().entrySet()) {
String key = (String)e.getKey();
if (incProps == null || U.containsStringArray(incProps, key, true) ||
@@ -2829,7 +2829,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
assert log != null;
if (log.isDebugEnabled() && S.includeSensitive())
- for (Map.Entry<Object, Object> entry : snapshot().entrySet())
+ for (Map.Entry<Object, Object> entry : IgniteSystemProperties.snapshot().entrySet())
log.debug("System property [" + entry.getKey() + '=' + entry.getValue() + ']');
}
@@ -4042,6 +4042,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
+ @Override public IgniteSnapshot snapshot() {
+ return ctx.cache().context().snapshotMgr();
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index e355300..a4625ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -32,6 +32,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -168,27 +170,73 @@ public class MarshallerContextImpl implements MarshallerContext {
}
/**
- * @param platformId Platform id.
- * @param marshallerMappings All marshaller mappings for given platformId.
- * @throws IgniteCheckedException In case of failure to process incoming marshaller mappings.
+ * @param log Ignite logger.
+ * @param mappings All marshaller mappings to write.
*/
- public void onMappingDataReceived(byte platformId, Map<Integer, MappedName> marshallerMappings)
- throws IgniteCheckedException
- {
- ConcurrentMap<Integer, MappedName> platformCache = getCacheFor(platformId);
+ public void onMappingDataReceived(IgniteLogger log, List<Map<Integer, MappedName>> mappings) {
+ addPlatformMappings(log,
+ mappings,
+ this::getCacheFor,
+ (mappedName, clsName) ->
+ mappedName == null || F.isEmpty(clsName) || !clsName.equals(mappedName.className()),
+ fileStore);
+ }
- for (Map.Entry<Integer, MappedName> e : marshallerMappings.entrySet()) {
- int typeId = e.getKey();
- String clsName = e.getValue().className();
+ /**
+ * @param ctx Kernal context.
+ * @param mappings Marshaller mappings to save.
+ * @param dir Directory to save given mappings to.
+ */
+ public static void saveMappings(GridKernalContext ctx, List<Map<Integer, MappedName>> mappings, File dir) {
+ MarshallerMappingFileStore writer = new MarshallerMappingFileStore(ctx,
+ mappingFileStoreWorkDir(dir.getAbsolutePath()));
+
+ addPlatformMappings(ctx.log(MarshallerContextImpl.class),
+ mappings,
+ b -> new ConcurrentHashMap<>(),
+ (mappedName, clsName) -> true,
+ writer);
+ }
- MappedName mappedName = platformCache.get(typeId);
+ /**
+ * @param mappings Map of marshaller mappings.
+ * @param mappedCache Cache to attach new mappings to.
+ * @param cacheAddPred Check mapping can be added.
+ * @param writer Persistence mapping writer.
+ */
+ private static void addPlatformMappings(
+ IgniteLogger log,
+ List<Map<Integer, MappedName>> mappings,
+ Function<Byte, ConcurrentMap<Integer, MappedName>> mappedCache,
+ BiPredicate<MappedName, String> cacheAddPred,
+ MarshallerMappingFileStore writer
+ ) {
+ if (mappings == null)
+ return;
- if (mappedName != null && !F.isEmpty(clsName) && clsName.equals(mappedName.className()))
+ for (byte platformId = 0; platformId < mappings.size(); platformId++) {
+ Map<Integer, MappedName> attach = mappings.get(platformId);
+
+ if (attach == null)
continue;
- platformCache.put(typeId, new MappedName(clsName, true));
+ ConcurrentMap<Integer, MappedName> cached = mappedCache.apply(platformId);
+
+ for (Map.Entry<Integer, MappedName> e : attach.entrySet()) {
+ Integer typeId = e.getKey();
+ String clsName = e.getValue().className();
+
+ if (cacheAddPred.test(cached.get(typeId), clsName)) {
+ try {
+ cached.put(typeId, new MappedName(clsName, true));
- fileStore.mergeAndWriteMapping(platformId, typeId, clsName);
+ writer.mergeAndWriteMapping(platformId, typeId, clsName);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to write marshaller mapping data", ex);
+ }
+ }
+ }
}
}
@@ -198,7 +246,7 @@ public class MarshallerContextImpl implements MarshallerContext {
* @param fileName File name.
*/
public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
- ConcurrentMap cache = getCacheFor(JAVA_ID);
+ ConcurrentMap<Integer, MappedName> cache = getCacheFor(JAVA_ID);
if (!cache.containsKey(clsName.hashCode()))
throw new IgniteException("Failed to read class name from class names properties file. " +
@@ -502,10 +550,10 @@ public class MarshallerContextImpl implements MarshallerContext {
IgniteConfiguration cfg = ctx.config();
String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
- final IgniteLogger fileStoreLog = ctx.log(MarshallerMappingFileStore.class);
fileStore = marshallerMappingFileStoreDir == null ?
- new MarshallerMappingFileStore(workDir, fileStoreLog) :
- new MarshallerMappingFileStore(fileStoreLog, marshallerMappingFileStoreDir);
+ new MarshallerMappingFileStore(ctx, mappingFileStoreWorkDir(workDir)) :
+ new MarshallerMappingFileStore(ctx, marshallerMappingFileStoreDir);
+
this.transport = transport;
closProc = ctx.closure();
clientNode = ctx.clientNode();
@@ -515,6 +563,19 @@ public class MarshallerContextImpl implements MarshallerContext {
}
/**
+ * @param igniteWorkDir Base ignite working directory.
+ * @return Resolved directory.
+ */
+ public static File mappingFileStoreWorkDir(String igniteWorkDir) {
+ try {
+ return U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
*
*/
public void onMarshallerProcessorStop() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index 8709d77..7945352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal;
import java.io.BufferedReader;
@@ -45,6 +46,9 @@ import org.apache.ignite.marshaller.MarshallerContext;
* when a classname is requested but is not presented in local cache of {@link MarshallerContextImpl}.
*/
final class MarshallerMappingFileStore {
+ /** */
+ private static final String FILE_EXTENSION = ".classname";
+
/** File lock timeout in milliseconds. */
private static final int FILE_LOCK_TIMEOUT_MS = 5000;
@@ -57,26 +61,15 @@ final class MarshallerMappingFileStore {
/** Marshaller mapping directory */
private final File workDir;
- /** */
- private final String FILE_EXTENSION = ".classname";
-
/**
- * @param igniteWorkDir Ignite work directory
- * @param log Logger.
- */
- MarshallerMappingFileStore(String igniteWorkDir, IgniteLogger log) throws IgniteCheckedException {
- workDir = U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
- this.log = log;
- }
-
- /**
- * Creates marshaller mapping file store with custom predefined work directory
- * @param log logger.
- * @param marshallerMappingFileStoreDir custom marshaller work directory
+ * Creates marshaller mapping file store with custom predefined work directory.
+ *
+ * @param workDir custom marshaller work directory.
+ * @param kctx Grid kernal context.
*/
- MarshallerMappingFileStore(final IgniteLogger log, final File marshallerMappingFileStoreDir) {
- this.workDir = marshallerMappingFileStoreDir;
- this.log = log;
+ MarshallerMappingFileStore(GridKernalContext kctx, final File workDir) {
+ this.workDir = workDir;
+ log = kctx.log(MarshallerMappingFileStore.class);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
index d31f3ba..03750d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.TransactionMetricsMxBeanImpl;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.managers.encryption.EncryptionMXBeanImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMXBeanImpl;
import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl;
import org.apache.ignite.internal.processors.metric.MetricsMxBeanImpl;
import org.apache.ignite.internal.util.StripedExecutor;
@@ -55,6 +56,7 @@ import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.mxbean.MetricsMxBean;
import org.apache.ignite.mxbean.ServiceMXBean;
import org.apache.ignite.mxbean.QueryMXBean;
+import org.apache.ignite.mxbean.SnapshotMXBean;
import org.apache.ignite.mxbean.StripedExecutorMXBean;
import org.apache.ignite.mxbean.ThreadPoolMXBean;
import org.apache.ignite.mxbean.TransactionMetricsMxBean;
@@ -179,6 +181,10 @@ public class IgniteMBeansManager {
registerMBean("Encryption", encryptionMXBean.getClass().getSimpleName(), encryptionMXBean,
EncryptionMXBean.class);
+ // Snapshot.
+ SnapshotMXBean snpMXBean = new SnapshotMXBeanImpl(ctx);
+ registerMBean("Snapshot", snpMXBean.getClass().getSimpleName(), snpMXBean, SnapshotMXBean.class);
+
// Metrics configuration
MetricsMxBean metricsMxBean = new MetricsMxBeanImpl(ctx.metric(), log);
registerMBean("Metrics", metricsMxBean.getClass().getSimpleName(), metricsMxBean, MetricsMxBean.class);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 23b3f19..828f18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -660,23 +660,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
}
- if (type == EVT_DISCOVERY_CUSTOM_EVT) {
- for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
- List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
-
- if (list != null) {
- for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
- try {
- lsnr.onCustomEvent(nextTopVer, node, customMsg);
- }
- catch (Exception e) {
- U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
- }
- }
- }
- }
- }
-
DiscoCache discoCache;
// Put topology snapshot into discovery history.
@@ -739,6 +722,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
+ if (type == EVT_DISCOVERY_CUSTOM_EVT) {
+ for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
+ List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
+
+ if (list != null) {
+ for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+ try {
+ lsnr.onCustomEvent(nextTopVer, node, customMsg);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
+ }
+ }
+ }
+ }
+ }
+
// If this is a local join event, just save it and do not notify listeners.
if (locJoinEvt) {
if (gridStartTime == 0)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index 7c1e15d..1d9e501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.pagemem.store;
+import java.io.Closeable;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -24,7 +25,17 @@ import org.apache.ignite.internal.processors.cache.persistence.StorageException;
/**
* Persistent store of pages.
*/
-public interface PageStore {
+public interface PageStore extends Closeable {
+ /**
+ * @param lsnr Page write listener to set.
+ */
+ public void addWriteListener(PageWriteListener lsnr);
+
+ /**
+ * @param lsnr Page write listener to remove.
+ */
+ public void removeWriteListener(PageWriteListener lsnr);
+
/**
* Checks if page exists.
*
@@ -53,9 +64,10 @@ public interface PageStore {
* @param pageId Page ID.
* @param pageBuf Page buffer to read into.
* @param keepCrc by default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc
+ * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
* @throws IgniteCheckedException If reading failed (IO error occurred).
*/
- public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;
+ public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;
/**
* Reads a header.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java
new file mode 100644
index 0000000..2f1b507
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.store;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+
+/**
+ * Each page write attempt to a {@link FilePageStore} may be covered by such listener.
+ * If it is necessary, a page data can be handled by another process prior to actually
+ * written to the PageStore.
+ */
+@FunctionalInterface
+public interface PageWriteListener {
+ /**
+ * @param pageId Handled page id.
+ * @param buf Buffer with data.
+ */
+ public void accept(long pageId, ByteBuffer buf);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 510cb6e..0062eb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -84,6 +84,7 @@ import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
/**
* Logic related to cache discovery data processing.
@@ -762,7 +763,8 @@ public class ClusterCachesInfo {
return;
}
- processStopCacheRequest(exchangeActions, req, cacheName, desc);
+ if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc))
+ return;
needExchange = true;
}
@@ -783,13 +785,27 @@ public class ClusterCachesInfo {
* @param exchangeActions Exchange actions to update.
* @param cacheName Cache name.
* @param desc Dynamic cache descriptor.
+ * @return {@code true} if stop request can be proceed.
*/
- private void processStopCacheRequest(
+ private boolean processStopCacheRequest(
ExchangeActions exchangeActions,
DynamicCacheChangeRequest req,
+ CacheChangeProcessResult res,
String cacheName,
DynamicCacheDescriptor desc
) {
+ if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
+ IgniteCheckedException err = new IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
+
+ U.warn(log, err);
+
+ res.errs.add(err);
+
+ ctx.cache().completeCacheStartFuture(req, false, err);
+
+ return false;
+ }
+
DynamicCacheDescriptor old = registeredCaches.get(cacheName);
assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
@@ -834,6 +850,8 @@ public class ClusterCachesInfo {
}
}
}
+
+ return true;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index f9e9376..dab45e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -31,6 +31,7 @@ import static org.apache.ignite.internal.IgniteFeatures.PME_FREE_SWITCH;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.exchangeProtocolVersion;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
/**
*
@@ -76,9 +77,10 @@ public class ExchangeContext {
log.warning("Current topology does not support the PME-free switch. Please check all nodes support" +
" this feature and it was not explicitly disabled by IGNITE_PME_FREE_SWITCH_DISABLED JVM option.");
+ boolean pmeFreeAvailable = (fut.wasRebalanced() && fut.isBaselineNodeFailed()) || isSnapshotOperation(fut.firstEvent());
+
if (!compatibilityNode &&
- fut.wasRebalanced() &&
- fut.isBaselineNodeFailed() &&
+ pmeFreeAvailable &&
allNodesSupportsPmeFreeSwitch) {
exchangeFreeSwitch = true;
merge = false;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dea3aca..326f24d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
@@ -2983,6 +2984,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
WalStateManager walStateMgr = new WalStateManager(ctx);
+ IgniteSnapshotManager snapshotMgr = new IgniteSnapshotManager(ctx);
IgniteCacheSnapshotManager snpMgr = ctx.plugins().createComponent(IgniteCacheSnapshotManager.class);
if (snpMgr == null)
@@ -3010,6 +3012,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
walMgr,
walStateMgr,
dbMgr,
+ snapshotMgr,
snpMgr,
depMgr,
exchMgr,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index e30f57e..ab496be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterNode;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -124,6 +126,9 @@ public class GridCacheSharedContext<K, V> {
/** Page store manager. {@code Null} if persistence is not enabled. */
@Nullable private IgnitePageStoreManager pageStoreMgr;
+ /** Snapshot manager for persistence caches. See {@link IgniteSnapshot}. */
+ private IgniteSnapshotManager snapshotMgr;
+
/** Affinity manager. */
private CacheAffinitySharedManager affMgr;
@@ -217,6 +222,7 @@ public class GridCacheSharedContext<K, V> {
@Nullable IgniteWriteAheadLogManager walMgr,
WalStateManager walStateMgr,
IgniteCacheDatabaseSharedManager dbMgr,
+ IgniteSnapshotManager snapshotMgr,
IgniteCacheSnapshotManager snpMgr,
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
@@ -242,6 +248,7 @@ public class GridCacheSharedContext<K, V> {
walMgr,
walStateMgr,
dbMgr,
+ snapshotMgr,
snpMgr,
depMgr,
exchMgr,
@@ -421,6 +428,7 @@ public class GridCacheSharedContext<K, V> {
walMgr,
walStateMgr,
dbMgr,
+ snapshotMgr,
snpMgr,
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
@@ -470,6 +478,7 @@ public class GridCacheSharedContext<K, V> {
IgniteWriteAheadLogManager walMgr,
WalStateManager walStateMgr,
IgniteCacheDatabaseSharedManager dbMgr,
+ IgniteSnapshotManager snapshotMgr,
IgniteCacheSnapshotManager snpMgr,
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
@@ -489,6 +498,7 @@ public class GridCacheSharedContext<K, V> {
this.walMgr = add(mgrs, walMgr);
this.walStateMgr = add(mgrs, walStateMgr);
this.dbMgr = add(mgrs, dbMgr);
+ this.snapshotMgr = add(mgrs, snapshotMgr);
this.snpMgr = add(mgrs, snpMgr);
this.jtaMgr = add(mgrs, jtaMgr);
this.depMgr = add(mgrs, depMgr);
@@ -740,6 +750,13 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Page storage snapshot manager.
+ */
+ public IgniteSnapshotManager snapshotMgr() {
+ return snapshotMgr;
+ }
+
+ /**
* @return Write ahead log manager.
*/
public IgniteWriteAheadLogManager wal() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index 924cf14..a3ffa62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
@@ -40,7 +41,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
/**
* Class handles saving/restoring binary metadata to/from disk.
@@ -74,41 +74,34 @@ class BinaryMetadataFileStore {
* @param metadataLocCache Metadata locale cache.
* @param ctx Context.
* @param log Logger.
- * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta
- * and consistentId
*/
BinaryMetadataFileStore(
final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache,
final GridKernalContext ctx,
final IgniteLogger log,
- @Nullable final File binaryMetadataFileStoreDir
- ) throws IgniteCheckedException {
+ final File workDir
+ ) {
this.metadataLocCache = metadataLocCache;
this.ctx = ctx;
this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config());
this.log = log;
+ this.workDir = workDir;
- if (!CU.isPersistenceEnabled(ctx.config()))
+ if (!CU.isPersistenceEnabled(ctx.config())) {
return;
+ }
fileIOFactory = ctx.config().getDataStorageConfiguration().getFileIOFactory();
+ }
- if (binaryMetadataFileStoreDir != null)
- workDir = binaryMetadataFileStoreDir;
- else {
- final String subFolder = ctx.pdsFolderResolver().resolveFolders().folderName();
-
- workDir = new File(U.resolveWorkDirectory(
- ctx.config().getWorkDirectory(),
- "binary_meta",
- false
- ),
- subFolder);
- }
-
+ /**
+ * Starts worker thread for async writing of binary metadata.
+ */
+ void start() throws IgniteCheckedException {
U.ensureDirectory(workDir, "directory for serialized binary metadata", log);
writer = new BinaryMetadataAsyncWriter();
+
new IgniteThread(writer).start();
}
@@ -145,7 +138,7 @@ class BinaryMetadataFileStore {
U.error(log, msg);
- writer.cancel();
+ U.cancel(writer);
ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 46632b7..f114bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -125,6 +125,9 @@ import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
* Binary processor implementation.
*/
public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor {
+ /** Binary metadata file store folder. */
+ public static final String BINARY_META_FOLDER = "binary_meta";
+
/** Immutable classes. */
private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>();
@@ -201,11 +204,42 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
marsh = ctx.grid().configuration().getMarshaller();
}
+ /**
+ * @param igniteWorkDir Basic ignite working directory.
+ * @param consId Node consistent id.
+ * @return Working directory.
+ */
+ public static File resolveBinaryWorkDir(String igniteWorkDir, String consId) {
+ try {
+ File workDir = new File(U.resolveWorkDirectory(
+ igniteWorkDir,
+ BINARY_META_FOLDER,
+ false),
+ consId);
+
+ U.ensureDirectory(workDir, "directory for serialized binary metadata", null);
+
+ return workDir;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (marsh instanceof BinaryMarshaller) {
- if (!ctx.clientNode())
- metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log, binaryMetadataFileStoreDir);
+ if (!ctx.clientNode()) {
+ metadataFileStore = new BinaryMetadataFileStore(metadataLocCache,
+ ctx,
+ log,
+ binaryMetadataFileStoreDir == null ?
+ resolveBinaryWorkDir(ctx.config().getWorkDirectory(),
+ ctx.pdsFolderResolver().resolveFolders().folderName()) :
+ binaryMetadataFileStoreDir);
+
+ metadataFileStore.start();
+ }
transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, ctx, log);
@@ -886,6 +920,23 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
+ @Override public void saveMetadata(Collection<BinaryType> types, File dir) {
+ try {
+ BinaryMetadataFileStore writer = new BinaryMetadataFileStore(new ConcurrentHashMap<>(),
+ ctx,
+ log,
+ resolveBinaryWorkDir(dir.getAbsolutePath(),
+ ctx.pdsFolderResolver().resolveFolders().folderName()));
+
+ for (BinaryType type : types)
+ writer.mergeAndWriteMetadata(((BinaryTypeImpl)type).metadata());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
A.notNullOrEmpty(typeName, "enum type name");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 926abdd..e907c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -140,6 +140,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallelUninterruptibly;
@@ -799,7 +800,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
ExchangeType exchange;
if (exchCtx.exchangeFreeSwitch()) {
- exchange = onExchangeFreeSwitch();
+ exchange = isSnapshotOperation(firstDiscoEvt) ? onCustomMessageNoAffinityChange() :
+ onExchangeFreeSwitchNodeLeft();
initCoordinatorCaches(newCrd);
}
@@ -937,6 +939,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onInitAfterTopologyLock(this);
+ // For pme-free exchanges onInitAfterTopologyLock must be
+ // invoked prior to onDoneBeforeTopologyUnlock.
+ if (exchange == ExchangeType.ALL && context().exchangeFreeSwitch()) {
+ cctx.exchange().exchangerBlockingSectionBegin();
+
+ try {
+ onDone(initialVersion());
+ }
+ finally {
+ cctx.exchange().exchangerBlockingSectionEnd();
+ }
+ }
+
if (exchLog.isInfoEnabled())
exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
}
@@ -1404,7 +1419,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @return Exchange type.
*/
- private ExchangeType onExchangeFreeSwitch() {
+ private ExchangeType onExchangeFreeSwitchNodeLeft() {
assert !firstDiscoEvt.eventNode().isClient() : this;
assert firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED;
@@ -1511,7 +1526,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean skipWaitOnLocalJoin = localJoinExchange()
&& cctx.exchange().latch().canSkipJoiningNodes(initialVersion());
- if (context().exchangeFreeSwitch())
+ if (context().exchangeFreeSwitch() && isBaselineNodeFailed())
waitPartitionRelease(true, false);
else if (!skipWaitOnLocalJoin) { // Skip partition release if node has locally joined (it doesn't have any updates to be finished).
boolean distributed = true;
@@ -1608,9 +1623,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionBegin();
try {
- if (context().exchangeFreeSwitch())
- onDone(initialVersion());
- else {
+ if (!context().exchangeFreeSwitch()) {
if (crd.isLocal()) {
if (remaining.isEmpty()) {
initFut.onDone(true);
@@ -1719,7 +1732,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (distributed)
releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
- partReleaseFut = context().exchangeFreeSwitch() ?
+ partReleaseFut = context().exchangeFreeSwitch() && isBaselineNodeFailed() ?
cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode()) :
cctx.partitionReleaseFuture(initialVersion());
@@ -2950,7 +2963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (finishState0 == null) {
- assert firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient() : this;
+ assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient())
+ || isSnapshotOperation(firstDiscoEvt) : GridDhtPartitionsExchangeFuture.this;
ClusterNode node = cctx.node(nodeId);
@@ -3957,6 +3971,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Set<Integer> parts;
if (exchCtx.exchangeFreeSwitch()) {
+ assert !isSnapshotOperation(firstDiscoEvt) : "Not allowed for taking snapshots: " + this;
+
// Previous topology to resolve failed primaries set.
AffinityTopologyVersion topVer = sharedContext().exchange().readyAffinityVersion();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
index 7c6938e..433ef27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence;
import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.jetbrains.annotations.Nullable;
@@ -36,6 +37,11 @@ public interface DbCheckpointListener {
public boolean nextSnapshot();
/**
+ * @return Checkpoint future which will be completed when checkpoint ends.
+ */
+ public IgniteInternalFuture<?> finishedStateFut();
+
+ /**
* @return Partition allocation statistic map
*/
public PartitionAllocationMap partitionStatMap();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 068fd5a..9d9433b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -68,7 +68,6 @@ import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
@@ -159,8 +158,8 @@ import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridCountDownCallback;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
+import org.apache.ignite.internal.util.GridCountDownCallback;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedExecutor;
@@ -1603,6 +1602,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
) {
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
+ cctx.snapshotMgr().onCacheGroupsStopped(stoppedGrps.stream()
+ .filter(IgniteBiTuple::get2)
+ .map(t -> t.get1().groupId())
+ .collect(Collectors.toList()));
+
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext gctx = tup.get1();
@@ -3945,6 +3949,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
CheckpointProgressImpl curr = scheduledCp;
+ List<DbCheckpointListener> dbLsnrs = new ArrayList<>(lsnrs);
+
CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
memoryRecoveryRecordPtr = null;
@@ -3964,7 +3970,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
internalReadLock();
try {
- for (DbCheckpointListener lsnr : lsnrs)
+ for (DbCheckpointListener lsnr : dbLsnrs)
lsnr.beforeCheckpointBegin(ctx0);
ctx0.awaitPendingTasksFinished();
@@ -3985,7 +3991,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tracker.onMarkStart();
// Listeners must be invoked before we write checkpoint record to WAL.
- for (DbCheckpointListener lsnr : lsnrs)
+ for (DbCheckpointListener lsnr : dbLsnrs)
lsnr.onMarkCheckpointBegin(ctx0);
ctx0.awaitPendingTasksFinished();
@@ -4038,7 +4044,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
curr.transitTo(LOCK_RELEASED);
- for (DbCheckpointListener lsnr : lsnrs)
+ for (DbCheckpointListener lsnr : dbLsnrs)
lsnr.onCheckpointBegin(ctx);
if (snapFut != null) {
@@ -4274,6 +4280,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishedStateFut() {
+ return delegate.finishedStateFut();
+ }
+
+ /** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return delegate.partitionStatMap();
}
@@ -4383,6 +4394,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishedStateFut() {
+ return curr.futureFor(FINISHED);
+ }
+
+ /** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return map;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index d9749f4..142d0a1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -25,6 +25,8 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,6 +36,7 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageWriteListener;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
@@ -87,6 +90,9 @@ public class FilePageStore implements PageStore {
/** Region metrics updater. */
private final LongAdderMetric allocatedTracker;
+ /** List of listeners for current page store to handle. */
+ private final List<PageWriteListener> lsnrs = new CopyOnWriteArrayList<>();
+
/** */
protected final int pageSize;
@@ -123,6 +129,16 @@ public class FilePageStore implements PageStore {
}
/** {@inheritDoc} */
+ @Override public void addWriteListener(PageWriteListener lsnr) {
+ lsnrs.add(lsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeWriteListener(PageWriteListener lsnr) {
+ lsnrs.remove(lsnr);
+ }
+
+ /** {@inheritDoc} */
@Override public int getPageSize() {
return pageSize;
}
@@ -297,8 +313,11 @@ public class FilePageStore implements PageStore {
return fileSize;
}
- /** {@inheritDoc} */
- @Override public void stop(boolean delete) throws StorageException {
+ /**
+ * @param delete {@code True} to delete file.
+ * @throws IOException If fails.
+ */
+ private void stop0(boolean delete) throws IOException {
lock.writeLock().lock();
try {
@@ -324,10 +343,6 @@ public class FilePageStore implements PageStore {
fileExists = false;
}
}
- catch (IOException e) {
- throw new StorageException("Failed to stop serving partition file [file=" + getFileAbsolutePath()
- + ", delete=" + delete + "]", e);
- }
finally {
allocatedTracker.add(-1L * allocated.getAndSet(0) / pageSize);
@@ -338,6 +353,22 @@ public class FilePageStore implements PageStore {
}
/** {@inheritDoc} */
+ @Override public void stop(boolean delete) throws StorageException {
+ try {
+ stop0(delete);
+ }
+ catch (IOException e) {
+ throw new StorageException("Failed to stop serving partition file [file=" + getFileAbsolutePath()
+ + ", delete=" + delete + "]", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ stop0(false);
+ }
+
+ /** {@inheritDoc} */
@Override public void truncate(int tag) throws StorageException {
init();
@@ -433,7 +464,7 @@ public class FilePageStore implements PageStore {
}
/** {@inheritDoc} */
- @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
+ @Override public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
init();
try {
@@ -453,7 +484,7 @@ public class FilePageStore implements PageStore {
if (n < 0) {
pageBuf.put(new byte[pageBuf.remaining()]);
- return;
+ return false;
}
int savedCrc32 = PageIO.getCrc(pageBuf);
@@ -478,6 +509,8 @@ public class FilePageStore implements PageStore {
if (keepCrc)
PageIO.setCrc(pageBuf, savedCrc32);
+
+ return true;
}
catch (IOException e) {
throw new StorageException("Failed to read page [file=" + getFileAbsolutePath() + ", pageId=" + pageId + "]", e);
@@ -501,7 +534,7 @@ public class FilePageStore implements PageStore {
/**
* @throws StorageException If failed to initialize store file.
*/
- private void init() throws StorageException {
+ public void init() throws StorageException {
if (!inited) {
lock.writeLock().lock();
@@ -675,6 +708,12 @@ public class FilePageStore implements PageStore {
assert pageBuf.position() == 0 : pageBuf.position();
+ for (PageWriteListener lsnr : lsnrs) {
+ lsnr.accept(pageId, pageBuf);
+
+ pageBuf.rewind();
+ }
+
fileIO.writeFully(pageBuf, off);
PageIO.setCrc(pageBuf, 0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index a6712db..2ee7e4d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -37,12 +37,15 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -89,9 +92,12 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.lang.String.format;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.newDirectoryStream;
import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
/**
* File page store manager.
@@ -137,6 +143,12 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
public static final PathMatcher TMP_FILE_MATCHER =
FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX);
+ /** Listeners of configuration changes e.g. overwrite or remove actions. */
+ private final List<BiConsumer<String, File>> lsnrs = new CopyOnWriteArrayList<>();
+
+ /** Lock which guards configuration changes. */
+ private final ReentrantReadWriteLock chgLock = new ReentrantReadWriteLock();
+
/** Marshaller. */
private final Marshaller marshaller;
@@ -430,20 +442,18 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException {
- File cacheWorkDir = cacheWorkDir(cacheData.config());
- File file;
+ CacheConfiguration<?, ?> ccfg = cacheData.config();
+ File cacheWorkDir = cacheWorkDir(ccfg);
checkAndInitCacheWorkDir(cacheWorkDir);
assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir;
- if (cacheData.config().getGroupName() != null)
- file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME);
- else
- file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
-
+ File file = cacheConfigurationFile(ccfg);
Path filePath = file.toPath();
+ chgLock.readLock().lock();
+
try {
if (overwrite || !Files.exists(filePath) || Files.size(filePath) == 0) {
File tmp = new File(file.getParent(), file.getName() + TMP_SUFFIX);
@@ -458,16 +468,40 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
marshaller.marshal(cacheData, stream);
}
+ if (Files.exists(filePath) && Files.size(filePath) > 0) {
+ for (BiConsumer<String, File> lsnr : lsnrs)
+ lsnr.accept(ccfg.getName(), file);
+ }
+
Files.move(tmp.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
catch (IOException ex) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
- throw new IgniteCheckedException("Failed to persist cache configuration: " + cacheData.config().getName(), ex);
+ throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex);
+ }
+ finally {
+ chgLock.readLock().unlock();
}
}
+ /**
+ * @param lsnr Instance of listener to add.
+ */
+ public void addConfigurationChangeListener(BiConsumer<String, File> lsnr) {
+ assert chgLock.isWriteLockedByCurrentThread();
+
+ lsnrs.add(lsnr);
+ }
+
+ /**
+ * @param lsnr Instance of listener to remove.
+ */
+ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
+ lsnrs.remove(lsnr);
+ }
+
/** {@inheritDoc} */
@Override public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException {
grpsWithoutIdx.remove(grp.groupId());
@@ -492,7 +526,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
- assert partId <= PageIdAllocator.MAX_PARTITION_ID;
+ assert partId <= MAX_PARTITION_ID;
PageStore store = getStore(grpId, partId);
@@ -651,6 +685,47 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/**
+ * @param grpId Cache group id.
+ * @param encrypted {@code true} if cache group encryption enabled.
+ * @return Factory to create page stores.
+ */
+ public FilePageStoreFactory getPageStoreFactory(int grpId, boolean encrypted) {
+ FileIOFactory pageStoreFileIoFactory = this.pageStoreFileIoFactory;
+ FileIOFactory pageStoreV1FileIoFactory = this.pageStoreV1FileIoFactory;
+
+ if (encrypted) {
+ pageStoreFileIoFactory = new EncryptedFileIOFactory(
+ this.pageStoreFileIoFactory,
+ grpId,
+ pageSize(),
+ cctx.kernalContext().encryption(),
+ cctx.gridConfig().getEncryptionSpi());
+
+ pageStoreV1FileIoFactory = new EncryptedFileIOFactory(
+ this.pageStoreV1FileIoFactory,
+ grpId,
+ pageSize(),
+ cctx.kernalContext().encryption(),
+ cctx.gridConfig().getEncryptionSpi());
+ }
+
+ FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
+ pageStoreFileIoFactory,
+ pageStoreV1FileIoFactory,
+ igniteCfg.getDataStorageConfiguration()
+ );
+
+ if (encrypted) {
+ int headerSize = pageStoreFactory.headerSize(pageStoreFactory.latestVersion());
+
+ ((EncryptedFileIOFactory)pageStoreFileIoFactory).headerSize(headerSize);
+ ((EncryptedFileIOFactory)pageStoreV1FileIoFactory).headerSize(headerSize);
+ }
+
+ return pageStoreFactory;
+ }
+
+ /**
* @param cacheWorkDir Work directory.
* @param grpId Group ID.
* @param partitions Number of partitions.
@@ -672,43 +747,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (dirExisted && !idxFile.exists())
grpsWithoutIdx.add(grpId);
- FileIOFactory pageStoreFileIoFactory = this.pageStoreFileIoFactory;
- FileIOFactory pageStoreV1FileIoFactory = this.pageStoreV1FileIoFactory;
-
- if (encrypted) {
- pageStoreFileIoFactory = new EncryptedFileIOFactory(
- this.pageStoreFileIoFactory,
- grpId,
- pageSize(),
- cctx.kernalContext().encryption(),
- cctx.gridConfig().getEncryptionSpi());
-
- pageStoreV1FileIoFactory = new EncryptedFileIOFactory(
- this.pageStoreV1FileIoFactory,
- grpId,
- pageSize(),
- cctx.kernalContext().encryption(),
- cctx.gridConfig().getEncryptionSpi());
- }
-
- FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
- pageStoreFileIoFactory,
- pageStoreV1FileIoFactory,
- igniteCfg.getDataStorageConfiguration()
- );
-
- if (encrypted) {
- int headerSize = pageStoreFactory.headerSize(pageStoreFactory.latestVersion());
-
- ((EncryptedFileIOFactory)pageStoreFileIoFactory).headerSize(headerSize);
- ((EncryptedFileIOFactory)pageStoreV1FileIoFactory).headerSize(headerSize);
- }
+ FilePageStoreFactory pageStoreFactory = getPageStoreFactory(grpId, encrypted);
PageStore idxStore =
- pageStoreFactory.createPageStore(
- PageMemory.FLAG_IDX,
- idxFile,
- allocatedTracker);
+ pageStoreFactory.createPageStore(
+ PageMemory.FLAG_IDX,
+ idxFile,
+ allocatedTracker);
PageStore[] partStores = new PageStore[partitions];
@@ -739,7 +784,27 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param partId Partition id.
*/
@NotNull private Path getPartitionFilePath(File cacheWorkDir, int partId) {
- return new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)).toPath();
+ return new File(cacheWorkDir, getPartitionFileName(partId)).toPath();
+ }
+
+ /**
+ * @param workDir Cache work directory.
+ * @param cacheDirName Cache directory name.
+ * @param partId Partition id.
+ * @return Partition file.
+ */
+ @NotNull public static File getPartitionFile(File workDir, String cacheDirName, int partId) {
+ return new File(cacheWorkDir(workDir, cacheDirName), getPartitionFileName(partId));
+ }
+
+ /**
+ * @param partId Partition id.
+ * @return File name.
+ */
+ public static String getPartitionFileName(int partId) {
+ assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+ return partId == INDEX_PARTITION ? INDEX_FILE_NAME : format(PART_FILE_TEMPLATE, partId);
}
/** {@inheritDoc} */
@@ -856,7 +921,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public long allocatePage(int grpId, int partId, byte flags) throws IgniteCheckedException {
- assert partId <= PageIdAllocator.MAX_PARTITION_ID || partId == PageIdAllocator.INDEX_PARTITION;
+ assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
PageStore store = getStore(grpId, partId);
@@ -884,6 +949,36 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
return store.pages();
}
+ /**
+ * @param ccfgs List of cache configurations to process.
+ * @param ccfgCons Consumer which accepts found configurations files.
+ */
+ public void readConfigurationFiles(List<CacheConfiguration<?, ?>> ccfgs,
+ BiConsumer<CacheConfiguration<?, ?>, File> ccfgCons) {
+ chgLock.writeLock().lock();
+
+ try {
+ for (CacheConfiguration<?, ?> ccfg : ccfgs) {
+ File cacheDir = cacheWorkDir(ccfg);
+
+ if (!cacheDir.exists())
+ continue;
+
+ File[] ccfgFiles = cacheDir.listFiles((dir, name) ->
+ name.endsWith(CACHE_DATA_FILENAME));
+
+ if (ccfgFiles == null)
+ continue;
+
+ for (File ccfgFile : ccfgFiles)
+ ccfgCons.accept(ccfg, ccfgFile);
+ }
+ }
+ finally {
+ chgLock.writeLock().unlock();
+ }
+ }
+
/** {@inheritDoc} */
@Override public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException {
if (cctx.kernalContext().clientNode())
@@ -997,24 +1092,45 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param ccfg Cache configuration.
* @return Store dir for given cache.
*/
- public File cacheWorkDir(CacheConfiguration ccfg) {
- boolean isSharedGrp = ccfg.getGroupName() != null;
-
- return cacheWorkDir(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
+ public File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
+ return cacheWorkDir(storeWorkDir, cacheDirName(ccfg));
}
/**
- *
+ * @param isSharedGroup {@code True} if cache is sharing the same `underlying` cache.
+ * @param cacheOrGroupName Cache name.
+ * @return Store directory for given cache.
*/
public File cacheWorkDir(boolean isSharedGroup, String cacheOrGroupName) {
- String dirName;
+ return cacheWorkDir(storeWorkDir, cacheDirName(isSharedGroup, cacheOrGroupName));
+ }
- if (isSharedGroup)
- dirName = CACHE_GRP_DIR_PREFIX + cacheOrGroupName;
- else
- dirName = CACHE_DIR_PREFIX + cacheOrGroupName;
+ /**
+ * @param cacheDirName Cache directory name.
+ * @return Store directory for given cache.
+ */
+ public static File cacheWorkDir(File storeWorkDir, String cacheDirName) {
+ return new File(storeWorkDir, cacheDirName);
+ }
+
+ /**
+ * @param isSharedGroup {@code True} if cache is sharing the same `underlying` cache.
+ * @param cacheOrGroupName Cache name.
+ * @return The full cache directory name.
+ */
+ public static String cacheDirName(boolean isSharedGroup, String cacheOrGroupName) {
+ return isSharedGroup ? CACHE_GRP_DIR_PREFIX + cacheOrGroupName
+ : CACHE_DIR_PREFIX + cacheOrGroupName;
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @return The full cache directory name.
+ */
+ public static String cacheDirName(CacheConfiguration<?, ?> ccfg) {
+ boolean isSharedGrp = ccfg.getGroupName() != null;
- return new File(storeWorkDir, dirName);
+ return cacheDirName(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
}
/**
@@ -1075,22 +1191,37 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void removeCacheData(StoredCacheData cacheData) throws IgniteCheckedException {
- CacheConfiguration cacheCfg = cacheData.config();
- File cacheWorkDir = cacheWorkDir(cacheCfg);
- File file;
+ chgLock.readLock().lock();
- if (cacheData.config().getGroupName() != null)
- file = new File(cacheWorkDir, cacheCfg.getName() + CACHE_DATA_FILENAME);
- else
- file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
+ try {
+ CacheConfiguration<?, ?> ccfg = cacheData.config();
+ File file = cacheConfigurationFile(ccfg);
- if (file.exists()) {
- if (!file.delete())
- throw new IgniteCheckedException("Failed to delete cache configuration: " + cacheCfg.getName());
+ if (file.exists()) {
+ for (BiConsumer<String, File> lsnr : lsnrs)
+ lsnr.accept(ccfg.getName(), file);
+
+ if (!file.delete())
+ throw new IgniteCheckedException("Failed to delete cache configuration: " + ccfg.getName());
+ }
+ }
+ finally {
+ chgLock.readLock().unlock();
}
}
/**
+ * @param ccfg Cache configuration.
+ * @return Cache configuration file with respect to {@link CacheConfiguration#getGroupName} value.
+ */
+ private File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
+ File cacheWorkDir = cacheWorkDir(ccfg);
+
+ return ccfg.getGroupName() == null ? new File(cacheWorkDir, CACHE_DATA_FILENAME) :
+ new File(cacheWorkDir, ccfg.getName() + CACHE_DATA_FILENAME);
+ }
+
+ /**
* @param store Store to shutdown.
* @param cleanFile {@code True} if files should be cleaned.
* @param aggr Aggregating exception.
@@ -1167,10 +1298,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
"(cache has not been started): " + grpId);
- if (partId == PageIdAllocator.INDEX_PARTITION)
+ if (partId == INDEX_PARTITION)
return holder.idxStore;
- if (partId > PageIdAllocator.MAX_PARTITION_ID)
+ if (partId > MAX_PARTITION_ID)
throw new IgniteCheckedException("Partition ID is reserved: " + partId);
PageStore store = holder.partStores[partId];
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
index dbdf670..c236827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
@@ -52,7 +52,7 @@ public class GroupPartitionId implements Comparable<GroupPartitionId> {
* @param partId Partition ID.
* @return flag to be used for partition
*/
- private static byte getFlagByPartId(final int partId) {
+ public static byte getFlagByPartId(final int partId) {
return partId == PageIdAllocator.INDEX_PARTITION ? PageMemory.FLAG_IDX : PageMemory.FLAG_DATA;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
index 21849b3..c377582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.UUID;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
@@ -36,7 +35,10 @@ import org.jetbrains.annotations.Nullable;
/**
* Snapshot manager stub.
+ *
+ * @deprecated Use {@link IgniteSnapshotManager}.
*/
+@Deprecated
public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends GridCacheSharedManagerAdapter implements IgniteChangeGlobalStateSupport {
/** Snapshot started lock filename. */
public static final String SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME = "snapshot-started.loc";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
new file mode 100644
index 0000000..ae8203a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * These major actions available:
+ * <ul>
+ * <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+ implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+ /** File with delta pages suffix. */
+ public static final String DELTA_SUFFIX = ".delta";
+
+ /** File name template consists of delta pages. */
+ public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+ /** File name template for index delta pages. */
+ public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+ /** Text Reason for checkpoint to start snapshot operation. */
+ public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+
+ /** Default snapshot directory for loading remote snapshots. */
+ public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+ /** Snapshot in progress error message. */
+ public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
+
+ /** Error message to finalize snapshot tasks. */
+ public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
+ "is stopping";
+
+ /** Metastorage key to save currently running snapshot. */
+ public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+ /** Snapshot metrics prefix. */
+ public static final String SNAPSHOT_METRICS = "snapshot";
+
+ /** Prefix for snapshot threads. */
+ private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+ /** Total number of thread to perform local snapshot. */
+ private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+ /**
+ * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
+ * It is important to have only only buffer per thread (instead of creating each buffer per
+ * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
+ * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
+ */
+ private final ThreadLocal<ByteBuffer> locBuff;
+
+ /** Map of registered cache snapshot processes and their corresponding contexts. */
+ private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+
+ /** Lock to protect the resources is used. */
+ private final GridBusyLock busyLock = new GridBusyLock();
+
+ /** Mutex used to order cluster snapshot operation progress. */
+ private final Object snpOpMux = new Object();
+
+ /** Take snapshot operation procedure. */
+ private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
+
+ /** Check previously performed snapshot operation and delete uncompleted files if need. */
+ private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
+
+ /** Resolved persistent data storage settings. */
+ private volatile PdsFolderSettings pdsSettings;
+
+ /** Fully initialized metastorage. */
+ private volatile ReadWriteMetastorage metaStorage;
+
+ /** Local snapshot sender factory. */
+ private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+
+ /** Main snapshot directory to save created snapshots. */
+ private volatile File locSnpDir;
+
+ /**
+ * Working directory for loaded snapshots from the remote nodes and storing
+ * temporary partition delta-files of locally started snapshot process.
+ */
+ private File tmpWorkDir;
+
+ /** Factory to working with delta as file storage. */
+ private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+ /** Factory to create page store for restore. */
+ private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+ /** Snapshot thread pool to perform local partition snapshots. */
+ private ExecutorService snpRunner;
+
+ /** System discovery message listener. */
+ private DiscoveryEventListener discoLsnr;
+
+ /** Cluster snapshot operation requested by user. */
+ private ClusterSnapshotFuture clusterSnpFut;
+
+ /** Current snapshot operation on local node. */
+ private volatile SnapshotOperationRequest clusterSnpReq;
+
+ /** {@code true} if recovery process occurred for snapshot. */
+ private volatile boolean recovered;
+
+ /** Last seen cluster snapshot operation. */
+ private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public IgniteSnapshotManager(GridKernalContext ctx) {
+ locBuff = ThreadLocal.withInitial(() ->
+ ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+ .order(ByteOrder.nativeOrder()));
+
+ startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
+ this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
+
+ endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
+ this::processLocalSnapshotEndStageResult);
+ }
+
+ /**
+ * @param snapshotCacheDir Snapshot directory to store files.
+ * @param partId Cache partition identifier.
+ * @return A file representation.
+ */
+ public static File partDeltaFile(File snapshotCacheDir, int partId) {
+ return new File(snapshotCacheDir, partDeltaFileName(partId));
+ }
+
+ /**
+ * @param partId Partition id.
+ * @return File name of delta partition pages.
+ */
+ public static String partDeltaFileName(int partId) {
+ assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+ return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void start0() throws IgniteCheckedException {
+ super.start0();
+
+ GridKernalContext ctx = cctx.kernalContext();
+
+ if (ctx.clientNode())
+ return;
+
+ if (!CU.isPersistenceEnabled(ctx.config()))
+ return;
+
+ snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+ cctx.igniteInstanceName(),
+ SNAPSHOT_THREAD_POOL_SIZE,
+ SNAPSHOT_THREAD_POOL_SIZE,
+ IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<>(),
+ SYSTEM_POOL,
+ new OomExceptionHandler(ctx));
+
+ assert cctx.pageStore() instanceof FilePageStoreManager;
+
+ FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+ pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+ locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+ tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true);
+
+ U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+ U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
+
+ MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+ mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+ "The system time of the last cluster snapshot request start time on this node.");
+ mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+ "The system time of the last cluster snapshot request end time on this node.");
+ mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, String.class,
+ "The name of last started cluster snapshot request on this node.");
+ mreg.register("LastSnapshotErrorMessage",
+ () -> lastSeenSnpFut.error() == null ? "" : lastSeenSnpFut.error().getMessage(),
+ String.class,
+ "The error message of last started cluster snapshot request which fail with an error. " +
+ "This value will be empty if last snapshot request has been completed successfully.");
+ mreg.register("LocalSnapshotNames", this::localSnapshotNames, List.class,
+ "The list of names of all snapshots currently saved on the local node with respect to " +
+ "the configured via IgniteConfiguration snapshot working path.");
+
+ storeFactory = storeMgr::getPageStoreFactory;
+
+ cctx.exchange().registerExchangeAwareComponent(this);
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+ cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ UUID leftNodeId = evt.eventNode().id();
+
+ if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+ SnapshotOperationRequest snpReq = clusterSnpReq;
+
+ for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+ if (sctx.sourceNodeId().equals(leftNodeId) ||
+ (snpReq != null &&
+ snpReq.snpName.equals(sctx.snapshotName()) &&
+ snpReq.bltNodes.contains(leftNodeId))) {
+ sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
+ "One of baseline nodes left the cluster: " + leftNodeId));
+ }
+ }
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void stop0(boolean cancel) {
+ busyLock.block();
+
+ try {
+ // Try stop all snapshot processing if not yet.
+ for (SnapshotFutureTask sctx : locSnpTasks.values())
+ sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+ locSnpTasks.clear();
+
+ synchronized (snpOpMux) {
+ if (clusterSnpFut != null) {
+ clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+ clusterSnpFut = null;
+ }
+ }
+
+ if (snpRunner != null)
+ snpRunner.shutdownNow();
+
+ if (discoLsnr != null)
+ cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+ cctx.exchange().unregisterExchangeAwareComponent(this);
+ }
+ finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * @param snpDir Snapshot dir.
+ * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
+ */
+ public static void deleteSnapshot(File snpDir, String folderName) {
+ if (!snpDir.exists())
+ return;
+
+ assert snpDir.isDirectory() : snpDir;
+
+ try {
+ File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName);
+ File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), databaseRelativePath(folderName), false);
+
+ U.delete(binDir);
+ U.delete(dbDir);
+
+ File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+ // Concurrently traverse the snapshot marshaller directory and delete all files.
+ Files.walkFileTree(marshDir.toPath(), new SimpleFileVisitor<Path>() {
+ @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+ U.delete(file);
+
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult visitFileFailed(Path file, IOException exc) {
+ // Skip files which can be concurrently removed from FileTree.
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+ if (!db.exists() || db.list().length == 0)
+ U.delete(snpDir);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param snpName Snapshot name.
+ * @return Local snapshot directory for snapshot with given name.
+ */
+ public File snapshotLocalDir(String snpName) {
+ assert locSnpDir != null;
+ assert U.alphanumericUnderscore(snpName) : snpName;
+
+ return new File(locSnpDir, snpName);
+ }
+
+ /**
+ * @return Node snapshot working directory.
+ */
+ public File snapshotTmpDir() {
+ assert tmpWorkDir != null;
+
+ return tmpWorkDir;
+ }
+
+ /**
+ * @param req Request on snapshot creation.
+ * @return Future which will be completed when a snapshot has been started.
+ */
+ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartStage(SnapshotOperationRequest req) {
+ if (cctx.kernalContext().clientNode() ||
+ !CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()))
+ return new GridFinishedFuture<>();
+
+ // Executed inside discovery notifier thread, prior to firing discovery custom event,
+ // so it is safe to set new snapshot task inside this method without synchronization.
+ if (clusterSnpReq != null) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. " +
+ "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
+ }
+
+ Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+ leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+ F.node2id()));
+
+ if (!leftNodes.isEmpty()) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Some of baseline nodes left the cluster " +
+ "prior to snapshot operation start: " + leftNodes));
+ }
+
+ Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+ leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+
+ if (!leftGrps.isEmpty()) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
+ "on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
+ }
+
+ Map<Integer, Set<Integer>> parts = new HashMap<>();
+
+ // Prepare collection of pairs group and appropriate cache partition to be snapshot.
+ // Cache group context may be 'null' on some nodes e.g. a node filter is set.
+ for (Integer grpId : req.grpIds) {
+ if (cctx.cache().cacheGroup(grpId) == null)
+ continue;
+
+ parts.put(grpId, null);
+ }
+
+ if (parts.isEmpty())
+ return new GridFinishedFuture<>();
+
+ SnapshotFutureTask task0 = registerSnapshotTask(req.snpName,
+ req.srcNodeId,
+ parts,
+ locSndrFactory.apply(req.snpName));
+
+ clusterSnpReq = req;
+
+ return task0.chain(fut -> {
+ if (fut.error() == null)
+ return new SnapshotOperationResponse();
+ else
+ throw new GridClosureException(fut.error());
+ });
+ }
+
+ /**
+ * @param id Request id.
+ * @param res Results.
+ * @param err Errors.
+ */
+ private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+ if (cctx.kernalContext().clientNode())
+ return;
+
+ SnapshotOperationRequest snpReq = clusterSnpReq;
+
+ if (snpReq == null || !snpReq.rqId.equals(id)) {
+ synchronized (snpOpMux) {
+ if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
+ clusterSnpFut.onDone(new IgniteCheckedException("Snapshot operation has not been fully completed " +
+ "[err=" + err + ", snpReq=" + snpReq + ']'));
+
+ clusterSnpFut = null;
+ }
+
+ return;
+ }
+ }
+
+ if (isLocalNodeCoordinator(cctx.discovery())) {
+ Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+ missed.removeAll(res.keySet());
+ missed.removeAll(err.keySet());
+
+ snpReq.hasErr = !F.isEmpty(err) || !missed.isEmpty();
+
+ if (snpReq.hasErr) {
+ U.warn(log, "Execution of local snapshot tasks fails or them haven't been executed " +
+ "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
+ "[err=" + err + ", missed=" + missed + ']');
+ }
+
+ endSnpProc.start(UUID.randomUUID(), snpReq);
+ }
+ }
+
+ /**
+ * @param req Request on snapshot creation.
+ * @return Future which will be completed when the snapshot will be finalized.
+ */
+ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
+ if (clusterSnpReq == null)
+ return new GridFinishedFuture<>(new SnapshotOperationResponse());
+
+ try {
+ if (req.hasErr)
+ deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+
+ removeLastMetaStorageKey();
+ }
+ catch (Exception e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return new GridFinishedFuture<>(new SnapshotOperationResponse());
+ }
+
+ /**
+ * @param id Request id.
+ * @param res Results.
+ * @param err Errors.
+ */
+ private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+ SnapshotOperationRequest snpReq = clusterSnpReq;
+
+ if (snpReq == null)
+ return;
+
+ Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+ endFail.removeAll(res.keySet());
+
+ clusterSnpReq = null;
+
+ synchronized (snpOpMux) {
+ if (clusterSnpFut != null) {
+ if (endFail.isEmpty() && !snpReq.hasErr) {
+ clusterSnpFut.onDone();
+
+ if (log.isInfoEnabled())
+ log.info("Cluster-wide snapshot operation finished successfully [req=" + snpReq + ']');
+ }
+ else {
+ clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
+ "Local snapshot tasks may not finished completely or finalizing results fails " +
+ "[hasErr=" + snpReq.hasErr + ", fail=" + endFail + ", err=" + err + ']'));
+ }
+
+ clusterSnpFut = null;
+ }
+ }
+ }
+
+ /**
+ * @return {@code True} if snapshot operation is in progress.
+ */
+ public boolean isSnapshotCreating() {
+ if (clusterSnpReq != null)
+ return true;
+
+ synchronized (snpOpMux) {
+ return clusterSnpReq != null || clusterSnpFut != null;
+ }
+ }
+
+ /**
+ * @return List of all known snapshots on the local node.
+ */
+ public List<String> localSnapshotNames() {
+ if (cctx.kernalContext().clientNode())
+ throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+ synchronized (snpOpMux) {
+ return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+ .map(File::getName)
+ .collect(Collectors.toList());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> createSnapshot(String name) {
+ A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+ A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+ try {
+ if (cctx.kernalContext().clientNode())
+ throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+ if (!IgniteFeatures.allNodesSupports(cctx.discovery().allNodes(), PERSISTENCE_CACHE_SNAPSHOT))
+ throw new IgniteException("Not all nodes in the cluster support a snapshot operation.");
+
+ if (!active(cctx.kernalContext().state().clusterState().state()))
+ throw new IgniteException("Snapshot operation has been rejected. The cluster is inactive.");
+
+ DiscoveryDataClusterState clusterState = cctx.kernalContext().state().clusterState();
+
+ if (!clusterState.hasBaselineTopology())
+ throw new IgniteException("Snapshot operation has been rejected. The baseline topology is not configured for cluster.");
+
+ ClusterSnapshotFuture snpFut0;
+
+ synchronized (snpOpMux) {
+ if (clusterSnpFut != null && !clusterSnpFut.isDone())
+ throw new IgniteException("Create snapshot request has been rejected. The previous snapshot operation was not completed.");
+
+ if (clusterSnpReq != null)
+ throw new IgniteException("Create snapshot request has been rejected. Parallel snapshot processes are not allowed.");
+
+ if (localSnapshotNames().contains(name))
+ throw new IgniteException("Create snapshot request has been rejected. Snapshot with given name already exists on local node.");
+
+ snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name);
+
+ clusterSnpFut = snpFut0;
+ lastSeenSnpFut = snpFut0;
+ }
+
+ List<Integer> grps = cctx.cache().persistentGroups().stream()
+ .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER)
+ .filter(g -> !g.config().isEncryptionEnabled())
+ .map(CacheGroupDescriptor::groupId)
+ .collect(Collectors.toList());
+
+ List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
+
+ startSnpProc.start(snpFut0.rqId, new SnapshotOperationRequest(snpFut0.rqId,
+ cctx.localNodeId(),
+ name,
+ grps,
+ new HashSet<>(F.viewReadOnly(srvNodes,
+ F.node2id(),
+ (node) -> CU.baselineNode(node, clusterState)))));
+
+ if (log.isInfoEnabled())
+ log.info("Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']');
+
+ return new IgniteFutureImpl<>(snpFut0);
+ }
+ catch (Exception e) {
+ U.error(log, "Start snapshot operation failed", e);
+
+ lastSeenSnpFut = new ClusterSnapshotFuture(name, e);
+
+ return new IgniteFinishedFutureImpl<>(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage metaStorage) throws IgniteCheckedException {
+ synchronized (snpOpMux) {
+ this.metaStorage = metaStorage;
+
+ if (recovered)
+ removeLastMetaStorageKey();
+
+ recovered = false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) throws IgniteCheckedException {
+ // Snapshot which has not been completed due to the local node crashed must be deleted.
+ String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
+
+ if (snpName == null)
+ return;
+
+ recovered = true;
+
+ for (File tmp : snapshotTmpDir().listFiles())
+ U.delete(tmp);
+
+ deleteSnapshot(snapshotLocalDir(snpName), pdsSettings.folderName());
+
+ if (log.isInfoEnabled()) {
+ log.info("Previous attempt to create snapshot fail due to the local node crash. All resources " +
+ "related to snapshot operation have been deleted: " + snpName);
+ }
+ }
+
+ /**
+ * @param evt Discovery event to check.
+ * @return {@code true} if exchange started by snapshot operation.
+ */
+ public static boolean isSnapshotOperation(DiscoveryEvent evt) {
+ return !evt.eventNode().isClient() &&
+ evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+ ((DiscoveryCustomEvent)evt).customMessage() instanceof SnapshotStartDiscoveryMessage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ if (clusterSnpReq == null || cctx.kernalContext().clientNode())
+ return;
+
+ SnapshotOperationRequest snpReq = clusterSnpReq;
+
+ SnapshotFutureTask task = locSnpTasks.get(snpReq.snpName);
+
+ if (task == null)
+ return;
+
+ if (task.start()) {
+ cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snpName));
+
+ // Schedule task on a checkpoint and wait when it starts.
+ try {
+ task.awaitStarted();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Fail to wait while cluster-wide snapshot operation started", e);
+ }
+ }
+ }
+
+ /**
+ * @param grps List of cache groups which will be destroyed.
+ */
+ public void onCacheGroupsStopped(List<Integer> grps) {
+ for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+ Set<Integer> retain = new HashSet<>(grps);
+ retain.retainAll(sctx.affectedCacheGroups());
+
+ if (!retain.isEmpty()) {
+ sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
+ "cache groups stopped: " + retain));
+ }
+ }
+ }
+
+ /**
+ * @param snpName Unique snapshot name.
+ * @param srcNodeId Node id which cause snapshot operation.
+ * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+ * @param snpSndr Factory which produces snapshot receiver instance.
+ * @return Snapshot operation task which should be registered on checkpoint to run.
+ */
+ SnapshotFutureTask registerSnapshotTask(
+ String snpName,
+ UUID srcNodeId,
+ Map<Integer, Set<Integer>> parts,
+ SnapshotSender snpSndr
+ ) {
+ if (!busyLock.enterBusy())
+ return new SnapshotFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
+
+ try {
+ if (locSnpTasks.containsKey(snpName))
+ return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+
+ SnapshotFutureTask snpFutTask;
+
+ SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
+ snpFutTask = new SnapshotFutureTask(cctx,
+ srcNodeId,
+ snpName,
+ tmpWorkDir,
+ ioFactory,
+ snpSndr,
+ parts,
+ locBuff));
+
+ if (prev != null)
+ return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+
+ if (log.isInfoEnabled()) {
+ log.info("Snapshot task has been registered on local node [sctx=" + this +
+ ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+ }
+
+ snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+
+ return snpFutTask;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param factory Factory which produces {@link LocalSnapshotSender} implementation.
+ */
+ void localSnapshotSenderFactory(Function<String, SnapshotSender> factory) {
+ locSndrFactory = factory;
+ }
+
+ /**
+ * @return Factory which produces {@link LocalSnapshotSender} implementation.
+ */
+ Function<String, SnapshotSender> localSnapshotSenderFactory() {
+ return locSndrFactory;
+ }
+
+ /** Snapshot finished successfully or already restored. Key can be removed. */
+ private void removeLastMetaStorageKey() throws IgniteCheckedException {
+ cctx.database().checkpointReadLock();
+
+ try {
+ metaStorage.remove(SNP_RUNNING_KEY);
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * @return The executor used to run snapshot tasks.
+ */
+ Executor snapshotExecutorService() {
+ assert snpRunner != null;
+
+ return snpRunner;
+ }
+
+ /**
+ * @param ioFactory Factory to create IO interface over a page stores.
+ */
+ void ioFactory(FileIOFactory ioFactory) {
+ this.ioFactory = ioFactory;
+ }
+
+ /**
+ * @return Relative configured path of persistence data storage directory for the local node.
+ * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
+ */
+ static String databaseRelativePath(String folderName) {
+ return Paths.get(DB_DEFAULT_FOLDER, folderName).toString();
+ }
+
+ /**
+ * @param cfg Ignite configuration.
+ * @return Snapshot directory resolved through given configuration.
+ */
+ static File resolveSnapshotWorkDirectory(IgniteConfiguration cfg) {
+ try {
+ return U.resolveWorkDirectory(cfg.getWorkDirectory(), cfg.getSnapshotPath(), false);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param factory Factory to produce FileIO access.
+ * @param from Copy from file.
+ * @param to Copy data to file.
+ * @param length Number of bytes to copy from beginning.
+ */
+ static void copy(FileIOFactory factory, File from, File to, long length) {
+ try (FileIO src = factory.create(from, READ);
+ FileChannel dest = new FileOutputStream(to).getChannel()) {
+ if (src.size() < length) {
+ throw new IgniteException("The source file to copy has to enough length " +
+ "[expected=" + length + ", actual=" + src.size() + ']');
+ }
+
+ src.position(0);
+
+ long written = 0;
+
+ while (written < length)
+ written += src.transferTo(written, length - written, dest);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Snapshot sender which writes all data to local directory.
+ */
+ private class LocalSnapshotSender extends SnapshotSender {
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** Local snapshot directory. */
+ private final File snpLocDir;
+
+ /** Local node snapshot directory calculated on snapshot directory. */
+ private File dbDir;
+
+ /** Size of page. */
+ private final int pageSize;
+
+ /**
+ * @param snpName Snapshot name.
+ */
+ public LocalSnapshotSender(String snpName) {
+ super(IgniteSnapshotManager.this.log, snpRunner);
+
+ this.snpName = snpName;
+ snpLocDir = snapshotLocalDir(snpName);
+ pageSize = cctx.kernalContext().config().getDataStorageConfiguration().getPageSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void init(int partsCnt) {
+ dbDir = new File(snpLocDir, databaseRelativePath(pdsSettings.folderName()));
+
+ if (dbDir.exists()) {
+ throw new IgniteException("Snapshot with given name already exists " +
+ "[snpName=" + snpName + ", absPath=" + dbDir.getAbsolutePath() + ']');
+ }
+
+ cctx.database().checkpointReadLock();
+
+ try {
+ assert metaStorage != null && metaStorage.read(SNP_RUNNING_KEY) == null :
+ "The previous snapshot hasn't been completed correctly";
+
+ metaStorage.write(SNP_RUNNING_KEY, snpName);
+
+ U.ensureDirectory(dbDir, "snapshot work directory", log);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCacheConfig0(File ccfg, String cacheDirName) {
+ assert dbDir != null;
+
+ try {
+ File cacheDir = U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+ copy(ioFactory, ccfg, new File(cacheDir, ccfg.getName()), ccfg.length());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+ if (mappings == null)
+ return;
+
+ saveMappings(cctx.kernalContext(), mappings, snpLocDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendBinaryMeta0(Collection<BinaryType> types) {
+ if (types == null)
+ return;
+
+ cctx.kernalContext().cacheObjects().saveMetadata(types, snpLocDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long len) {
+ try {
+ if (len == 0)
+ return;
+
+ File cacheDir = U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+ File snpPart = new File(cacheDir, part.getName());
+
+ if (!snpPart.exists() || snpPart.delete())
+ snpPart.createNewFile();
+
+ copy(ioFactory, part, snpPart, len);
+
+ if (log.isInfoEnabled()) {
+ log.info("Partition has been snapshot [snapshotDir=" + dbDir.getAbsolutePath() +
+ ", cacheDirName=" + cacheDirName + ", part=" + part.getName() +
+ ", length=" + part.length() + ", snapshot=" + snpPart.getName() + ']');
+ }
+ }
+ catch (IOException | IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+ File snpPart = getPartitionFile(dbDir, cacheDirName, pair.getPartitionId());
+
+ if (log.isInfoEnabled()) {
+ log.info("Start partition snapshot recovery with the given delta page file [part=" + snpPart +
+ ", delta=" + delta + ']');
+ }
+
+ try (FileIO fileIo = ioFactory.create(delta, READ);
+ FilePageStore pageStore = (FilePageStore)storeFactory
+ .apply(pair.getGroupId(), false)
+ .createPageStore(getFlagByPartId(pair.getPartitionId()),
+ snpPart::toPath,
+ new LongAdderMetric("NO_OP", null))
+ ) {
+ ByteBuffer pageBuf = ByteBuffer.allocate(pageSize)
+ .order(ByteOrder.nativeOrder());
+
+ long totalBytes = fileIo.size();
+
+ assert totalBytes % pageSize == 0 : "Given file with delta pages has incorrect size: " + fileIo.size();
+
+ pageStore.beginRecover();
+
+ for (long pos = 0; pos < totalBytes; pos += pageSize) {
+ long read = fileIo.readFully(pageBuf, pos);
+
+ assert read == pageBuf.capacity();
+
+ pageBuf.flip();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Read page given delta file [path=" + delta.getName() +
+ ", pageId=" + PageIO.getPageId(pageBuf) + ", pos=" + pos + ", pages=" + (totalBytes / pageSize) +
+ ", crcBuff=" + FastCrc.calcCrc(pageBuf, pageBuf.limit()) + ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
+
+ pageBuf.rewind();
+ }
+
+ pageStore.write(PageIO.getPageId(pageBuf), pageBuf, 0, false);
+
+ pageBuf.flip();
+ }
+
+ pageStore.finishRecover();
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void close0(@Nullable Throwable th) {
+ if (th == null) {
+ if (log.isInfoEnabled())
+ log.info("Local snapshot sender closed, resources released [dbNodeSnpDir=" + dbDir + ']');
+ }
+ else {
+ deleteSnapshot(snpLocDir, pdsSettings.folderName());
+
+ U.warn(log, "Local snapshot sender closed due to an error occurred", th);
+ }
+ }
+ }
+
+ /** Snapshot start request for {@link DistributedProcess} initiate message. */
+ private static class SnapshotOperationRequest implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Unique snapshot request id. */
+ private final UUID rqId;
+
+ /** Source node id which trigger request. */
+ private final UUID srcNodeId;
+
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** The list of cache groups to include into snapshot. */
+ @GridToStringInclude
+ private final List<Integer> grpIds;
+
+ /** The list of affected by snapshot operation baseline nodes. */
+ @GridToStringInclude
+ private final Set<UUID> bltNodes;
+
+ /** {@code true} if an execution of local snapshot tasks failed with an error. */
+ private volatile boolean hasErr;
+
+ /**
+ * @param snpName Snapshot name.
+ * @param grpIds Cache groups to include into snapshot.
+ */
+ public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String snpName, List<Integer> grpIds, Set<UUID> bltNodes) {
+ this.rqId = rqId;
+ this.srcNodeId = srcNodeId;
+ this.snpName = snpName;
+ this.grpIds = grpIds;
+ this.bltNodes = bltNodes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotOperationRequest.class, this);
+ }
+ }
+
+ /** */
+ private static class SnapshotOperationResponse implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+ }
+
+ /** Snapshot operation start message. */
+ private static class SnapshotStartDiscoveryMessage extends InitMessage<SnapshotOperationRequest>
+ implements SnapshotDiscoveryMessage {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param processId Unique process id.
+ * @param req Snapshot initial request.
+ */
+ public SnapshotStartDiscoveryMessage(
+ UUID processId,
+ SnapshotOperationRequest req
+ ) {
+ super(processId, START_SNAPSHOT, req);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needExchange() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needAssignPartitions() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotStartDiscoveryMessage.class, this);
+ }
+ }
+
+ /** */
+ private static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
+ /** Unique snapshot request id. */
+ private final UUID rqId;
+
+ /** Snapshot name. */
+ private final String name;
+
+ /** Snapshot start time. */
+ private final long startTime;
+
+ /** Snapshot finish time. */
+ private volatile long endTime;
+
+ /**
+ * Default constructor.
+ */
+ public ClusterSnapshotFuture() {
+ onDone();
+
+ rqId = null;
+ name = "";
+ startTime = 0;
+ endTime = 0;
+ }
+
+ /**
+ * @param name Snapshot name.
+ * @param err Error starting snapshot operation.
+ */
+ public ClusterSnapshotFuture(String name, Exception err) {
+ onDone(err);
+
+ this.name = name;
+ startTime = U.currentTimeMillis();
+ endTime = 0;
+ rqId = null;
+ }
+
+ /**
+ * @param rqId Unique snapshot request id.
+ */
+ public ClusterSnapshotFuture(UUID rqId, String name) {
+ this.rqId = rqId;
+ this.name = name;
+ startTime = U.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+ endTime = U.currentTimeMillis();
+
+ return super.onDone(res, err, cancel);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
new file mode 100644
index 0000000..93270e2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageWriteListener;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
+
+/**
+ *
+ */
+class SnapshotFutureTask extends GridFutureAdapter<Boolean> implements DbCheckpointListener {
+ /** Shared context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** File page store manager for accessing cache group associated files. */
+ private final FilePageStoreManager pageStore;
+
+ /** Ignite logger. */
+ private final IgniteLogger log;
+
+ /** Node id which cause snapshot operation. */
+ private final UUID srcNodeId;
+
+ /** Unique identifier of snapshot process. */
+ private final String snpName;
+
+ /** Snapshot working directory on file system. */
+ private final File tmpSnpWorkDir;
+
+ /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
+ private final ThreadLocal<ByteBuffer> locBuff;
+
+ /** IO factory which will be used for creating snapshot delta-writers. */
+ private final FileIOFactory ioFactory;
+
+ /**
+ * The length of file size per each cache partition file.
+ * Partition has value greater than zero only for partitions in OWNING state.
+ * Information collected under checkpoint write lock.
+ */
+ private final Map<GroupPartitionId, Long> partFileLengths = new HashMap<>();
+
+ /**
+ * Map of partitions to snapshot and theirs corresponding delta PageStores.
+ * Writers are pinned to the snapshot context due to controlling partition
+ * processing supplier.
+ */
+ private final Map<GroupPartitionId, PageStoreSerialWriter> partDeltaWriters = new HashMap<>();
+
+ /**
+ * List of cache configuration senders. Each sender associated with particular cache
+ * configuration file to monitor it change (e.g. via SQL add/drop column or SQL index
+ * create/drop operations).
+ */
+ private final List<CacheConfigurationSender> ccfgSndrs = new CopyOnWriteArrayList<>();
+
+ /** Snapshot data sender. */
+ @GridToStringExclude
+ private final SnapshotSender snpSndr;
+
+ /**
+ * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
+ * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+ * In this case if all of partitions have OWNING state the index partition also will be included.
+ * <p>
+ * If partitions for particular cache group are not provided that they will be collected and added
+ * on checkpoint under the write lock.
+ */
+ private final Map<Integer, Set<Integer>> parts;
+
+ /** Cache group and corresponding partitions collected under the checkpoint write lock. */
+ private final Map<Integer, Set<Integer>> processed = new HashMap<>();
+
+ /** Checkpoint end future. */
+ private final CompletableFuture<Boolean> cpEndFut = new CompletableFuture<>();
+
+ /** Future to wait until checkpoint mark phase will be finished and snapshot tasks scheduled. */
+ private final GridFutureAdapter<Void> startedFut = new GridFutureAdapter<>();
+
+ /** Absolute path to save intermediate results of cache partitions of this node. */
+ private volatile File tmpConsIdDir;
+
+ /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
+ private volatile CompletableFuture<Void> closeFut;
+
+ /** An exception which has been occurred during snapshot processing. */
+ private final AtomicReference<Throwable> err = new AtomicReference<>();
+
+ /** Flag indicates that task already scheduled on checkpoint. */
+ private final AtomicBoolean started = new AtomicBoolean();
+
+ /**
+ * @param e Finished snapshot task future with particular exception.
+ */
+ public SnapshotFutureTask(IgniteCheckedException e) {
+ assert e != null : "Exception for a finished snapshot task must be not null";
+
+ cctx = null;
+ pageStore = null;
+ log = null;
+ snpName = null;
+ srcNodeId = null;
+ tmpSnpWorkDir = null;
+ snpSndr = null;
+
+ err.set(e);
+ startedFut.onDone(e);
+ onDone(e);
+ parts = null;
+ ioFactory = null;
+ locBuff = null;
+ }
+
+ /**
+ * @param snpName Unique identifier of snapshot task.
+ * @param ioFactory Factory to working with delta as file storage.
+ * @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
+ * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+ */
+ public SnapshotFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ String snpName,
+ File tmpWorkDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts,
+ ThreadLocal<ByteBuffer> locBuff
+ ) {
+ assert snpName != null : "Snapshot name cannot be empty or null.";
+ assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+ assert snpSndr.executor() != null : "Executor service must be not null.";
+ assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
+
+ this.parts = parts;
+ this.cctx = cctx;
+ this.pageStore = (FilePageStoreManager)cctx.pageStore();
+ this.log = cctx.logger(SnapshotFutureTask.class);
+ this.snpName = snpName;
+ this.srcNodeId = srcNodeId;
+ this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+ this.snpSndr = snpSndr;
+ this.ioFactory = ioFactory;
+ this.locBuff = locBuff;
+ }
+
+ /**
+ * @return Snapshot name.
+ */
+ public String snapshotName() {
+ return snpName;
+ }
+
+ /**
+ * @return Node id which triggers this operation.
+ */
+ public UUID sourceNodeId() {
+ return srcNodeId;
+ }
+
+ /**
+ * @return Type of snapshot operation.
+ */
+ public Class<? extends SnapshotSender> type() {
+ return snpSndr.getClass();
+ }
+
+ /**
+ * @return Set of cache groups included into snapshot operation.
+ */
+ public Set<Integer> affectedCacheGroups() {
+ return parts.keySet();
+ }
+
+ /**
+ * @param th An exception which occurred during snapshot processing.
+ */
+ public void acceptException(Throwable th) {
+ if (th == null)
+ return;
+
+ if (err.compareAndSet(null, th))
+ closeAsync();
+
+ startedFut.onDone(th);
+
+ U.warn(log, "Snapshot task has accepted exception to stop: " + th);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+ for (PageStoreSerialWriter writer : partDeltaWriters.values())
+ U.closeQuiet(writer);
+
+ for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
+ U.closeQuiet(ccfgSndr);
+
+ snpSndr.close(err);
+
+ if (tmpConsIdDir != null)
+ U.delete(tmpConsIdDir);
+
+ // Delete snapshot directory if no other files exists.
+ try {
+ if (U.fileCount(tmpSnpWorkDir.toPath()) == 0 || err != null)
+ U.delete(tmpSnpWorkDir.toPath());
+ }
+ catch (IOException e) {
+ log.error("Snapshot directory doesn't exist [snpName=" + snpName + ", dir=" + tmpSnpWorkDir + ']');
+ }
+
+ if (err != null)
+ startedFut.onDone(err);
+
+ return super.onDone(res, err);
+ }
+
+ /**
+ * @throws IgniteCheckedException If fails.
+ */
+ public void awaitStarted() throws IgniteCheckedException {
+ startedFut.get();
+ }
+
+ /**
+ * @return {@code true} if current task requested to be stopped.
+ */
+ private boolean stopping() {
+ return err.get() != null;
+ }
+
+ /**
+ * Initiates snapshot task.
+ *
+ * @return {@code true} if task started by this call.
+ */
+ public boolean start() {
+ if (stopping())
+ return false;
+
+ try {
+ if (!started.compareAndSet(false, true))
+ return false;
+
+ tmpConsIdDir = U.resolveWorkDirectory(tmpSnpWorkDir.getAbsolutePath(),
+ databaseRelativePath(cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName()),
+ false);
+
+ for (Integer grpId : parts.keySet()) {
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+ if (gctx == null)
+ throw new IgniteCheckedException("Cache group context not found: " + grpId);
+
+ if (!CU.isPersistentCache(gctx.config(), cctx.kernalContext().config().getDataStorageConfiguration()))
+ throw new IgniteCheckedException("In-memory cache groups are not allowed to be snapshot: " + grpId);
+
+ if (gctx.config().isEncryptionEnabled())
+ throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
+
+ // Create cache group snapshot directory on start in a single thread.
+ U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+ "directory for snapshotting cache group",
+ log);
+ }
+
+ startedFut.listen(f ->
+ ((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
+ );
+
+ // Listener will be removed right after first execution.
+ ((GridCacheDatabaseSharedManager)cctx.database()).addCheckpointListener(this);
+
+ if (log.isInfoEnabled()) {
+ log.info("Snapshot operation is scheduled on local node and will be handled by the checkpoint " +
+ "listener [sctx=" + this + ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ acceptException(e);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ if (stopping())
+ return;
+
+ ctx.finishedStateFut().listen(f -> {
+ if (f.error() == null)
+ cpEndFut.complete(true);
+ else
+ cpEndFut.completeExceptionally(f.error());
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ // Write lock is hold. Partition pages counters has been collected under write lock.
+ if (stopping())
+ return;
+
+ try {
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+ int grpId = e.getKey();
+ Set<Integer> grpParts = e.getValue();
+
+ GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology();
+
+ Iterator<GridDhtLocalPartition> iter;
+
+ if (grpParts == null)
+ iter = top.currentLocalPartitions().iterator();
+ else {
+ if (grpParts.contains(INDEX_PARTITION)) {
+ throw new IgniteCheckedException("Index partition cannot be included into snapshot if " +
+ " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']');
+ }
+
+ iter = F.iterator(grpParts, top::localPartition, false);
+ }
+
+ Set<Integer> owning = new HashSet<>();
+ Set<Integer> missed = new HashSet<>();
+
+ // Iterate over partitions in particular cache group.
+ while (iter.hasNext()) {
+ GridDhtLocalPartition part = iter.next();
+
+ // Partition can be in MOVING\RENTING states.
+ // Index partition will be excluded if not all partition OWNING.
+ // There is no data assigned to partition, thus it haven't been created yet.
+ if (part.state() == GridDhtPartitionState.OWNING)
+ owning.add(part.id());
+ else
+ missed.add(part.id());
+ }
+
+ if (grpParts != null) {
+ // Partition has been provided for cache group, but some of them are not in OWNING state.
+ // Exit with an error.
+ if (!missed.isEmpty()) {
+ throw new IgniteCheckedException("Snapshot operation cancelled due to " +
+ "not all of requested partitions has OWNING state on local node [grpId=" + grpId +
+ ", missed" + missed + ']');
+ }
+ }
+ else {
+ // Partitions has not been provided for snapshot task and all partitions have
+ // OWNING state, so index partition must be included into snapshot.
+ if (!missed.isEmpty()) {
+ log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " +
+ "Partitions which have different states skipped. Index partitions has also been skipped " +
+ "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
+ }
+ else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
+ owning.add(INDEX_PARTITION);
+ }
+
+ processed.put(grpId, owning);
+ }
+
+ List<CacheConfiguration<?, ?>> ccfgs = new ArrayList<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grpId = e.getKey();
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+ if (gctx == null) {
+ throw new IgniteCheckedException("Cache group context has not found " +
+ "due to the cache group is stopped: " + grpId);
+ }
+
+ for (int partId : e.getValue()) {
+ GroupPartitionId pair = new GroupPartitionId(grpId, partId);
+
+ PageStore store = pageStore.getStore(grpId, partId);
+
+ partDeltaWriters.put(pair,
+ new PageStoreSerialWriter(store,
+ partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+
+ partFileLengths.put(pair, store.size());
+ }
+
+ ccfgs.add(gctx.config());
+ }
+
+ pageStore.readConfigurationFiles(ccfgs,
+ (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+ }
+ catch (IgniteCheckedException e) {
+ acceptException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) {
+ if (stopping())
+ return;
+
+ assert !processed.isEmpty() : "Partitions to process must be collected under checkpoint mark phase";
+
+ wrapExceptionIfStarted(() -> snpSndr.init(processed.values().stream().mapToInt(Set::size).sum()))
+ .run();
+
+ // Snapshot task can now be started since checkpoint write lock released and
+ // there is no error happen on task init.
+ if (!startedFut.onDone())
+ return;
+
+ // Submit all tasks for partitions and deltas processing.
+ List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+ if (log.isInfoEnabled())
+ log.info("Submit partition processing tasks with partition allocated lengths: " + partFileLengths);
+
+ Collection<BinaryType> binTypesCopy = cctx.kernalContext()
+ .cacheObjects()
+ .metadata(Collections.emptyList())
+ .values();
+
+ // Process binary meta.
+ futs.add(CompletableFuture.runAsync(
+ wrapExceptionIfStarted(() -> snpSndr.sendBinaryMeta(binTypesCopy)),
+ snpSndr.executor()));
+
+ List<Map<Integer, MappedName>> mappingsCopy = cctx.kernalContext()
+ .marshallerContext()
+ .getCachedMappings();
+
+ // Process marshaller meta.
+ futs.add(CompletableFuture.runAsync(
+ wrapExceptionIfStarted(() -> snpSndr.sendMarshallerMeta(mappingsCopy)),
+ snpSndr.executor()));
+
+ // Send configuration files of all cache groups.
+ for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
+ futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
+
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grpId = e.getKey();
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+ if (gctx == null) {
+ acceptException(new IgniteCheckedException("Cache group context has not found " +
+ "due to the cache group is stopped: " + grpId));
+
+ break;
+ }
+
+ // Process partitions for a particular cache group.
+ for (int partId : e.getValue()) {
+ GroupPartitionId pair = new GroupPartitionId(grpId, partId);
+
+ CacheConfiguration<?, ?> ccfg = gctx.config();
+
+ assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
+
+ String cacheDirName = cacheDirName(ccfg);
+ Long partLen = partFileLengths.get(pair);
+
+ CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
+ wrapExceptionIfStarted(() -> {
+ snpSndr.sendPart(
+ getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+ cacheDirName,
+ pair,
+ partLen);
+
+ // Stop partition writer.
+ partDeltaWriters.get(pair).markPartitionProcessed();
+ }),
+ snpSndr.executor())
+ // Wait for the completion of both futures - checkpoint end, copy partition.
+ .runAfterBothAsync(cpEndFut,
+ wrapExceptionIfStarted(() -> {
+ File delta = partDeltaWriters.get(pair).deltaFile;
+
+ try {
+ // Atomically creates a new, empty delta file if and only if
+ // a file with this name does not yet exist.
+ delta.createNewFile();
+ }
+ catch (IOException ex) {
+ throw new IgniteCheckedException(ex);
+ }
+
+ snpSndr.sendDelta(delta, cacheDirName, pair);
+
+ boolean deleted = delta.delete();
+
+ assert deleted;
+ }),
+ snpSndr.executor());
+
+ futs.add(fut0);
+ }
+ }
+
+ int futsSize = futs.size();
+
+ CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+ .whenComplete((res, t) -> {
+ assert t == null : "Exception must never be thrown since a wrapper is used " +
+ "for each snapshot task: " + t;
+
+ closeAsync();
+ });
+ }
+
+ /**
+ * @param exec Runnable task to execute.
+ * @return Wrapped task.
+ */
+ private Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) {
+ return () -> {
+ if (stopping())
+ return;
+
+ try {
+ exec.run();
+ }
+ catch (Throwable t) {
+ acceptException(t);
+ }
+ };
+ }
+
+ /**
+ * @return Future which will be completed when operations truly stopped.
+ */
+ public synchronized CompletableFuture<Void> closeAsync() {
+ if (closeFut == null) {
+ Throwable err0 = err.get();
+
+ closeFut = CompletableFuture.runAsync(() -> onDone(true, err0),
+ cctx.kernalContext().getSystemExecutorService());
+ }
+
+ return closeFut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ acceptException(new IgniteCheckedException("Snapshot operation has been cancelled by external process " +
+ "[snpName=" + snpName + ']'));
+
+ try {
+ closeAsync().get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ U.error(log, "SnapshotFutureTask cancellation failed", e);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ SnapshotFutureTask ctx = (SnapshotFutureTask)o;
+
+ return snpName.equals(ctx.snpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(snpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotFutureTask.class, this);
+ }
+
+ /** */
+ private class CacheConfigurationSender implements BiConsumer<String, File>, Closeable {
+ /** Cache name associated with configuration file. */
+ private final String cacheName;
+
+ /** Cache directory associated with configuration file. */
+ private final String cacheDirName;
+
+ /** Lock for cache configuration processing. */
+ private final Lock lock = new ReentrantLock();
+
+ /** Configuration file to send. */
+ private volatile File ccfgFile;
+
+ /** {@code true} if configuration file already sent. */
+ private volatile boolean sent;
+
+ /**
+ * {@code true} if an old configuration file written to the temp directory and
+ * waiting to be sent.
+ */
+ private volatile boolean fromTemp;
+
+ /**
+ * @param ccfgFile Cache configuration to send.
+ * @param cacheDirName Cache directory.
+ */
+ public CacheConfigurationSender(String cacheName, String cacheDirName, File ccfgFile) {
+ this.cacheName = cacheName;
+ this.cacheDirName = cacheDirName;
+ this.ccfgFile = ccfgFile;
+
+ pageStore.addConfigurationChangeListener(this);
+ }
+
+ /**
+ * Send the original cache configuration file or the temp one instead saved due to
+ * concurrent configuration change operation happened (e.g. SQL add/drop column).
+ */
+ public void sendCacheConfig() {
+ lock.lock();
+
+ try {
+ snpSndr.sendCacheConfig(ccfgFile, cacheDirName);
+
+ close0();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(String cacheName, File ccfgFile) {
+ assert ccfgFile.exists() :
+ "Cache configuration file must exist [cacheName=" + cacheName +
+ ", ccfgFile=" + ccfgFile.getAbsolutePath() + ']';
+
+ if (stopping())
+ return;
+
+ if (!cacheName.equals(this.cacheName) || sent || fromTemp)
+ return;
+
+ lock.lock();
+
+ try {
+ if (sent || fromTemp)
+ return;
+
+ File cacheWorkDir = cacheWorkDir(tmpSnpWorkDir, cacheDirName);
+
+ if (!U.mkdirs(cacheWorkDir))
+ throw new IOException("Unable to create temp directory to copy original configuration file: " + cacheWorkDir);
+
+ File newCcfgFile = new File(cacheWorkDir, ccfgFile.getName());
+ newCcfgFile.createNewFile();
+
+ copy(ioFactory, ccfgFile, newCcfgFile, ccfgFile.length());
+
+ this.ccfgFile = newCcfgFile;
+ fromTemp = true;
+ }
+ catch (IOException e) {
+ acceptException(e);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** Close writer and remove listener. */
+ private void close0() {
+ sent = true;
+ pageStore.removeConfigurationChangeListener(this);
+
+ if (fromTemp)
+ U.delete(ccfgFile);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ lock.lock();
+
+ try {
+ close0();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /** */
+ private class PageStoreSerialWriter implements PageWriteListener, Closeable {
+ /** Page store to which current writer is related to. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Partition delta file to store delta pages into. */
+ private final File deltaFile;
+
+ /** Busy lock to protect write operations. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** {@code true} if need the original page from PageStore instead of given buffer. */
+ @GridToStringExclude
+ private final BooleanSupplier checkpointComplete = () ->
+ cpEndFut.isDone() && !cpEndFut.isCompletedExceptionally();
+
+ /**
+ * Array of bits. 1 - means pages written, 0 - the otherwise.
+ * Size of array can be estimated only under checkpoint write lock.
+ */
+ private final AtomicBitSet writtenPages;
+
+ /** IO over the underlying delta file. */
+ @GridToStringExclude
+ private volatile FileIO deltaFileIo;
+
+ /** {@code true} if partition file has been copied to external resource. */
+ private volatile boolean partProcessed;
+
+ /**
+ * @param store Partition page store.
+ * @param deltaFile Destination file to write pages to.
+ */
+ public PageStoreSerialWriter(PageStore store, File deltaFile) {
+ assert store != null;
+ assert cctx.database().checkpointLockIsHeldByThread();
+
+ this.deltaFile = deltaFile;
+ this.store = store;
+ // It is important to init {@link AtomicBitSet} under the checkpoint write-lock.
+ // This guarantee us that no pages will be modified and it's safe to init pages
+ // list which needs to be processed.
+ writtenPages = new AtomicBitSet(store.pages());
+
+ store.addWriteListener(this);
+ }
+
+ /**
+ * @return {@code true} if writer is stopped and cannot write pages.
+ */
+ public boolean stopped() {
+ return (checkpointComplete.getAsBoolean() && partProcessed) || stopping();
+ }
+
+ /**
+ * Mark partition has been processed by another thread.
+ */
+ public void markPartitionProcessed() {
+ lock.writeLock().lock();
+
+ try {
+ partProcessed = true;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(long pageId, ByteBuffer buf) {
+ assert buf.position() == 0 : buf.position();
+ assert buf.order() == ByteOrder.nativeOrder() : buf.order();
+
+ if (deltaFileIo == null) {
+ lock.writeLock().lock();
+
+ try {
+ if (stopped())
+ return;
+
+ if (deltaFileIo == null)
+ deltaFileIo = ioFactory.create(deltaFile);
+ }
+ catch (IOException e) {
+ acceptException(e);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ int pageIdx = -1;
+
+ lock.readLock().lock();
+
+ try {
+ if (stopped())
+ return;
+
+ pageIdx = PageIdUtils.pageIndex(pageId);
+
+ if (checkpointComplete.getAsBoolean()) {
+ // Page already written.
+ if (!writtenPages.touch(pageIdx))
+ return;
+
+ final ByteBuffer locBuf = locBuff.get();
+
+ assert locBuf.capacity() == store.getPageSize();
+
+ locBuf.clear();
+
+ if (!store.read(pageId, locBuf, true))
+ return;
+
+ locBuf.flip();
+
+ writePage0(pageId, locBuf);
+ }
+ else {
+ // Direct buffer is needs to be written, associated checkpoint not finished yet.
+ writePage0(pageId, buf);
+
+ // Page marked as written to delta file, so there is no need to
+ // copy it from file when the first checkpoint associated with
+ // current snapshot task ends.
+ writtenPages.touch(pageIdx);
+ }
+ }
+ catch (Throwable ex) {
+ acceptException(new IgniteCheckedException("Error during writing pages to delta partition file " +
+ "[pageIdx=" + pageIdx + ", writer=" + this + ']', ex));
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param pageBuf Page buffer to write.
+ * @throws IOException If page writing failed (IO error occurred).
+ */
+ private void writePage0(long pageId, ByteBuffer pageBuf) throws IOException {
+ assert deltaFileIo != null : "Delta pages storage is not inited: " + this;
+ assert pageBuf.position() == 0;
+ assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer order " + pageBuf.order()
+ + " should be same with " + ByteOrder.nativeOrder();
+
+ if (log.isDebugEnabled()) {
+ log.debug("onPageWrite [pageId=" + pageId +
+ ", pageIdBuff=" + PageIO.getPageId(pageBuf) +
+ ", fileSize=" + deltaFileIo.size() +
+ ", crcBuff=" + FastCrc.calcCrc(pageBuf, pageBuf.limit()) +
+ ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
+
+ pageBuf.rewind();
+ }
+
+ // Write buffer to the end of the file.
+ deltaFileIo.writeFully(pageBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ lock.writeLock().lock();
+
+ try {
+ U.closeQuiet(deltaFileIo);
+
+ deltaFileIo = null;
+
+ store.removeWriteListener(this);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PageStoreSerialWriter.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class AtomicBitSet {
+ /** Container of bits. */
+ private final AtomicIntegerArray arr;
+
+ /** Size of array of bits. */
+ private final int size;
+
+ /**
+ * @param size Size of array.
+ */
+ public AtomicBitSet(int size) {
+ this.size = size;
+
+ arr = new AtomicIntegerArray((size + 31) >>> 5);
+ }
+
+ /**
+ * @param off Bit position to change.
+ * @return {@code true} if bit has been set,
+ * {@code false} if bit changed by another thread or out of range.
+ */
+ public boolean touch(long off) {
+ if (off >= size)
+ return false;
+
+ int bit = 1 << off;
+ int bucket = (int)(off >>> 5);
+
+ while (true) {
+ int cur = arr.get(bucket);
+ int val = cur | bit;
+
+ if (cur == val)
+ return false;
+
+ if (arr.compareAndSet(bucket, cur, val))
+ return true;
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
new file mode 100644
index 0000000..e6b6a72
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import org.apache.ignite.internal.GridKernalContextImpl;
+import org.apache.ignite.mxbean.SnapshotMXBean;
+
+/**
+ * Snapshot MBean features.
+ */
+public class SnapshotMXBeanImpl implements SnapshotMXBean {
+ /** Instance of snapshot cache shared manager. */
+ private final IgniteSnapshotManager mgr;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public SnapshotMXBeanImpl(GridKernalContextImpl ctx) {
+ mgr = ctx.cache().context().snapshotMgr();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void createSnapshot(String snpName) {
+ mgr.createSnapshot(snpName);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java
new file mode 100644
index 0000000..c48f899
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+abstract class SnapshotSender {
+ /** Busy processing lock. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Executor to run operation at. */
+ private final Executor exec;
+
+ /** {@code true} if sender is currently working. */
+ private volatile boolean closed;
+
+ /** Ignite logger to use. */
+ protected final IgniteLogger log;
+
+ /**
+ * @param log Ignite logger to use.
+ */
+ protected SnapshotSender(IgniteLogger log, Executor exec) {
+ this.exec = exec;
+ this.log = log.getLogger(SnapshotSender.class);
+ }
+
+ /**
+ * @return Executor to run internal operations on.
+ */
+ public Executor executor() {
+ return exec;
+ }
+
+ /**
+ * @param mappings Local node marshaller mappings.
+ */
+ public final void sendMarshallerMeta(List<Map<Integer, MappedName>> mappings) {
+ if (!lock.readLock().tryLock())
+ return;
+
+ try {
+ if (closed)
+ return;
+
+ if (mappings == null)
+ return;
+
+ sendMarshallerMeta0(mappings);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param types Collection of known binary types.
+ */
+ public final void sendBinaryMeta(Collection<BinaryType> types) {
+ if (!lock.readLock().tryLock())
+ return;
+
+ try {
+ if (closed)
+ return;
+
+ if (types == null)
+ return;
+
+ sendBinaryMeta0(types);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param ccfg Cache configuration file.
+ * @param cacheDirName Cache group directory name.
+ */
+ public final void sendCacheConfig(File ccfg, String cacheDirName) {
+ if (!lock.readLock().tryLock())
+ return;
+
+ try {
+ if (closed)
+ return;
+
+ sendCacheConfig0(ccfg, cacheDirName);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param part Partition file to send.
+ * @param cacheDirName Cache group directory name.
+ * @param pair Group id with partition id pair.
+ * @param length Partition length.
+ */
+ public final void sendPart(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ if (!lock.readLock().tryLock())
+ return;
+
+ try {
+ if (closed)
+ return;
+
+ sendPart0(part, cacheDirName, pair, length);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param delta Delta pages file.
+ * @param cacheDirName Cache group directory name.
+ * @param pair Group id with partition id pair.
+ */
+ public final void sendDelta(File delta, String cacheDirName, GroupPartitionId pair) {
+ if (!lock.readLock().tryLock())
+ return;
+
+ try {
+ if (closed)
+ return;
+
+ sendDelta0(delta, cacheDirName, pair);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Closes this snapshot sender and releases any resources associated with it.
+ * If the sender is already closed then invoking this method has no effect.
+ *
+ * @param th An exception occurred during snapshot operation processing.
+ */
+ public final void close(@Nullable Throwable th) {
+ lock.writeLock().lock();
+
+ try {
+ close0(th);
+
+ closed = true;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param partsCnt Number of objects to process.
+ */
+ protected abstract void init(int partsCnt);
+
+ /**
+ * @param part Partition file to send.
+ * @param cacheDirName Cache group directory name.
+ * @param pair Group id with partition id pair.
+ * @param length Partition length.
+ */
+ protected abstract void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length);
+
+ /**
+ * @param delta Delta pages file.
+ * @param cacheDirName Cache group directory name.
+ * @param pair Group id with partition id pair.
+ */
+ protected abstract void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair);
+
+ /**
+ * @param mappings Local node marshaller mappings.
+ */
+ protected void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+ // No-op by default.
+ }
+
+ /**
+ * @param types Collection of known binary types.
+ */
+ protected void sendBinaryMeta0(Collection<BinaryType> types) {
+ // No-op by default.
+ }
+
+ /**
+ * @param ccfg Cache configuration file.
+ * @param cacheDirName Cache group directory name.
+ */
+ protected void sendCacheConfig0(File ccfg, String cacheDirName) {
+ // No-op by default.
+ }
+
+ /**
+ * Closes this snapshot sender and releases any resources associated with it.
+ * If the sender is already closed then invoking this method has no effect.
+ */
+ protected void close0(@Nullable Throwable th) {
+ // No-op by default.
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 63c282f..cb5bfc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -376,7 +376,7 @@ public class IgniteWalIteratorFactory {
return new GridCacheSharedContext<>(
kernalCtx, null, null, null,
- null, null, null, dbMgr, null,
+ null, null, null, dbMgr, null, null,
null, null, null, null, null,
null, null, null, null, null, null
);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 88203c5..7d0bee1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -106,13 +106,12 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
/**
* Dummy grid kernal context
*/
public class StandaloneGridKernalContext implements GridKernalContext {
- /** Binary metadata file store folder. */
- public static final String BINARY_META_FOLDER = "binary_meta";
-
/** Config for fake Ignite instance. */
private final IgniteConfiguration cfg;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index c029705..62e15df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.cacheobject;
+import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
-
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -301,6 +301,12 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
public Collection<BinaryType> metadata() throws IgniteException;
/**
+ * @param types Collection of binary types to write to.
+ * @param dir Destination directory.
+ */
+ public void saveMetadata(Collection<BinaryType> types, File dir);
+
+ /**
* @param typeName Type name.
* @param ord ordinal.
* @return Enum object.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index dad0b44..663a3f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -347,20 +347,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
* @param mappings Incoming marshaller mappings.
*/
private void processIncomingMappings(List<Map<Integer, MappedName>> mappings) {
- if (mappings != null) {
- for (int i = 0; i < mappings.size(); i++) {
- Map<Integer, MappedName> map;
-
- if ((map = mappings.get(i)) != null) {
- try {
- marshallerCtx.onMappingDataReceived((byte)i, map);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to process marshaller mapping data", e);
- }
- }
- }
- }
+ marshallerCtx.onMappingDataReceived(log, mappings);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index cf05916..8d608f1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -367,8 +367,8 @@ public abstract class IgniteUtils {
/** Thread dump message. */
public static final String THREAD_DUMP_MSG = "Thread dump at ";
- /** Correct Mbean cache name pattern. */
- private static Pattern MBEAN_CACHE_NAME_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
+ /** Alphanumeric with underscore regexp pattern. */
+ private static final Pattern ALPHANUMERIC_UNDERSCORE_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
/** Project home directory. */
private static volatile GridTuple<String> ggHome;
@@ -4793,13 +4793,21 @@ public abstract class IgniteUtils {
* @return An escaped string.
*/
private static String escapeObjectNameValue(String s) {
- if (MBEAN_CACHE_NAME_PATTERN.matcher(s).matches())
+ if (alphanumericUnderscore(s))
return s;
return '\"' + s.replaceAll("[\\\\\"?*]", "\\\\$0") + '\"';
}
/**
+ * @param s String to check.
+ * @return {@code true} if given string contains only alphanumeric and underscore symbols.
+ */
+ public static boolean alphanumericUnderscore(String s) {
+ return ALPHANUMERIC_UNDERSCORE_PATTERN.matcher(s).matches();
+ }
+
+ /**
* Registers MBean with the server.
*
* @param <T> Type of mbean.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 3bc719d..93f4725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI3;
@@ -80,17 +82,41 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
/** Logger. */
private final IgniteLogger log;
+ /** Factory which creates custom {@link InitMessage} for distributed process initialization. */
+ private BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory;
+
+ /**
+ * @param ctx Kernal context.
+ * @param type Process type.
+ * @param exec Execute action and returns future with the single node result to send to the coordinator.
+ * @param finish Finish process closure. Called on each node when all single nodes results received.
+ */
+ public DistributedProcess(
+ GridKernalContext ctx,
+ DistributedProcessType type,
+ Function<I, IgniteInternalFuture<R>> exec,
+ CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish
+ ) {
+ this(ctx, type, exec, finish, (id, req) -> new InitMessage<>(id, type, req));
+ }
+
/**
* @param ctx Kernal context.
* @param type Process type.
* @param exec Execute action and returns future with the single node result to send to the coordinator.
* @param finish Finish process closure. Called on each node when all single nodes results received.
+ * @param initMsgFactory Factory which creates custom {@link InitMessage} for distributed process initialization.
*/
- public DistributedProcess(GridKernalContext ctx, DistributedProcessType type,
+ public DistributedProcess(
+ GridKernalContext ctx,
+ DistributedProcessType type,
Function<I, IgniteInternalFuture<R>> exec,
- CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish) {
+ CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish,
+ BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory
+ ) {
this.ctx = ctx;
this.type = type;
+ this.initMsgFactory = initMsgFactory;
log = ctx.log(getClass());
@@ -218,9 +244,7 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
*/
public void start(UUID id, I req) {
try {
- InitMessage<I> msg = new InitMessage<>(id, type, req);
-
- ctx.discovery().sendCustomEvent(msg);
+ ctx.discovery().sendCustomEvent(initMsgFactory.apply(id, req));
}
catch (IgniteCheckedException e) {
log.warning("Unable to start process.", e);
@@ -379,6 +403,20 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
*
* @see GridEncryptionManager
*/
- MASTER_KEY_CHANGE_FINISH
+ MASTER_KEY_CHANGE_FINISH,
+
+ /**
+ * Start snapshot procedure.
+ *
+ * @see IgniteSnapshotManager
+ */
+ START_SNAPSHOT,
+
+ /**
+ * End snapshot procedure.
+ *
+ * @see IgniteSnapshotManager
+ */
+ END_SNAPSHOT
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
index e23ba36..4e43f52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
@@ -81,7 +81,7 @@ public class InitMessage<I extends Serializable> implements DiscoveryCustomMessa
/** {@inheritDoc} */
@Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
DiscoCache discoCache) {
- return null;
+ return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
}
/** @return Process id. */
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
new file mode 100644
index 0000000..22816d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import org.apache.ignite.IgniteSnapshot;
+
+/**
+ * Snapshot features MBean.
+ */
+@MXBeanDescription("MBean that provides access for snapshot features.")
+public interface SnapshotMXBean {
+ /**
+ * Create the cluster-wide snapshot with given name asynchronously.
+ *
+ * @param snpName Snapshot name to created.
+ * @see IgniteSnapshot#createSnapshot(String) (String)
+ */
+ @MXBeanDescription("Create cluster-wide snapshot.")
+ public void createSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName);
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 8af5747..3123911 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -404,7 +404,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
/**
*
*/
- protected void checkPartitionMapExchangeFinished() {
+ public static void checkPartitionMapExchangeFinished() {
for (Ignite g : G.allGrids()) {
IgniteKernal g0 = (IgniteKernal)g;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
index 7045d98..2289d4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
@@ -58,6 +58,7 @@ import org.junit.Test;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
import static org.apache.ignite.testframework.GridTestUtils.suppressException;
/**
@@ -546,7 +547,7 @@ public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractT
*/
private void cleanBinaryMetaFolderForNode(String consId) throws IgniteCheckedException {
String dfltWorkDir = U.defaultWorkDirectory();
- File metaDir = U.resolveWorkDirectory(dfltWorkDir, "binary_meta", false);
+ File metaDir = U.resolveWorkDirectory(dfltWorkDir, BINARY_META_FOLDER, false);
for (File subDir : metaDir.listFiles()) {
if (subDir.getName().contains(consId)) {
@@ -661,7 +662,7 @@ public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractT
/** */
private static boolean isBinaryMetaFile(File file) {
- return file.getPath().contains("binary_meta");
+ return file.getPath().contains(BINARY_META_FOLDER);
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
index 69601b9..47b23a09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
@@ -45,6 +45,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
/**
*
*/
@@ -364,8 +366,8 @@ public class IgnitePdsBinaryMetadataOnClusterRestartTest extends GridCommonAbstr
) throws Exception {
String workDir = U.defaultWorkDirectory();
- Path fromFile = Paths.get(workDir, fromWorkDir, "binary_meta", fromConsId, fileName);
- Path toFile = Paths.get(workDir, toWorkDir, "binary_meta", toConsId, fileName);
+ Path fromFile = Paths.get(workDir, fromWorkDir, BINARY_META_FOLDER, fromConsId, fileName);
+ Path toFile = Paths.get(workDir, toWorkDir, BINARY_META_FOLDER, toConsId, fileName);
Files.copy(fromFile, toFile, StandardCopyOption.REPLACE_EXISTING);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
index 0e01409..fd8b797 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
@@ -31,11 +31,12 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
/**
*
*/
@@ -133,7 +134,7 @@ public class IgnitePdsNoSpaceLeftOnDeviceTest extends GridCommonAbstractTest {
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
if (unluckyConsistentId.get() != null
&& file.getAbsolutePath().contains(unluckyConsistentId.get())
- && file.getAbsolutePath().contains(StandaloneGridKernalContext.BINARY_META_FOLDER))
+ && file.getAbsolutePath().contains(BINARY_META_FOLDER))
throw new IOException("No space left on device");
return delegateFactory.create(file, modes);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index 230cb5c..9ff5914 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -150,6 +150,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
null)
).createSerializer(serVer);
@@ -470,6 +471,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
new GridCacheIoManager(),
null,
null,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 5549d69..f530e80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -95,6 +95,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
null,
null,
null,
+ null,
new CacheDiagnosticManager()
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 9125e96..106b9ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -94,6 +94,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index 0234196..c575f6d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -110,6 +110,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest {
null,
null,
null,
+ null,
new CacheDiagnosticManager()
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 5abccf4..5105489 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -100,6 +100,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 595eac0..4be75b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -620,6 +620,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
null,
null,
null,
+ null,
null
);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
new file mode 100644
index 0000000..2c59eb8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+
+import static java.nio.file.Files.newDirectoryStream;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_SNAPSHOT_TMP_DIR;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Base snapshot tests.
+ */
+public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
+ /** Default snapshot name. */
+ protected static final String SNAPSHOT_NAME = "testSnapshot";
+
+ /** Default number of partitions for cache. */
+ protected static final int CACHE_PARTS_COUNT = 8;
+
+ /** Number of cache keys to pre-create at node start. */
+ protected static final int CACHE_KEYS_RANGE = 1024;
+
+ /** Configuration for the 'default' cache. */
+ protected volatile CacheConfiguration<Integer, Integer> dfltCacheCfg;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi discoSpi = new BlockingCustomMessageDiscoverySpi();
+
+ discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());
+
+ return cfg.setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(100L * 1024 * 1024)
+ .setPersistenceEnabled(true))
+ .setCheckpointFrequency(3000)
+ .setPageSize(4096))
+ .setCacheConfiguration(dfltCacheCfg)
+ .setClusterStateOnStart(INACTIVE)
+ // Default work directory must be resolved earlier if snapshot used to start grids.
+ .setWorkDirectory(U.defaultWorkDirectory())
+ .setDiscoverySpi(discoSpi);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cleanPersistenceDir() throws Exception {
+ super.cleanPersistenceDir();
+
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false));
+ }
+
+ /** @throws Exception If fails. */
+ @Before
+ public void beforeTestSnapshot() throws Exception {
+ cleanPersistenceDir();
+
+ dfltCacheCfg = txCacheConfig(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+ }
+
+ /** @throws Exception If fails. */
+ @After
+ public void afterTestSnapshot() throws Exception {
+ try {
+ for (Ignite ig : G.allGrids()) {
+ if (ig.configuration().isClientMode())
+ continue;
+
+ File storeWorkDir = ((FilePageStoreManager)((IgniteEx)ig).context()
+ .cache().context().pageStore()).workDir();
+
+ Path snpTempDir = Paths.get(storeWorkDir.getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR);
+
+ assertEquals("Snapshot working directory must be empty at the moment test execution stopped: " + snpTempDir,
+ 0, U.fileCount(snpTempDir));
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @param ccfg Default cache configuration.
+ * @return Cache configuration.
+ */
+ protected static <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+ return ccfg.setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(2)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setAffinity(new RendezvousAffinityFunction(false, CACHE_PARTS_COUNT));
+ }
+
+ /**
+ * Calculate CRC for all partition files of specified cache.
+ *
+ * @param cacheDir Cache directory to iterate over partition files.
+ * @return The map of [fileName, checksum].
+ */
+ public static Map<String, Integer> calculateCRC32Partitions(File cacheDir) {
+ assert cacheDir.isDirectory() : cacheDir.getAbsolutePath();
+
+ Map<String, Integer> result = new HashMap<>();
+
+ try {
+ try (DirectoryStream<Path> partFiles = newDirectoryStream(cacheDir.toPath(),
+ p -> p.toFile().getName().startsWith(PART_FILE_PREFIX) && p.toFile().getName().endsWith(FILE_SUFFIX))
+ ) {
+ for (Path path : partFiles)
+ result.put(path.toFile().getName(), FastCrc.calcCrc(path.toFile()));
+ }
+
+ return result;
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param path Directory to search.
+ * @param dir Directory name.
+ * @return Result.
+ * @throws IOException If fails.
+ */
+ public static Optional<Path> searchDirectoryRecursively(Path path, String dir) throws IOException {
+ if (Files.notExists(path))
+ return Optional.empty();
+
+ return Files.walk(path)
+ .filter(Files::isDirectory)
+ .filter(file -> dir.equals(file.getFileName().toString()))
+ .findAny();
+ }
+
+ /**
+ * @param ccfg Default cache configuration.
+ * @return Ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+ return startGridsWithCache(1, ccfg, keys);
+ }
+
+ /**
+ * @param grids Number of grids to start.
+ * @param ccfg Default cache configuration.
+ * @param keys Range of cache keys to insert.
+ * @return Ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+ dfltCacheCfg = ccfg;
+
+ return startGridsWithCache(grids, keys, Integer::new, ccfg);
+ }
+
+ /**
+ * @param grids Number of ignite instances to start.
+ * @param keys Number of keys to create.
+ * @param factory Factory which produces values.
+ * @param <V> Cache value type.
+ * @return Ignite coordinator instance.
+ * @throws Exception If fails.
+ */
+ protected <V> IgniteEx startGridsWithCache(
+ int grids,
+ int keys,
+ Function<Integer, V> factory,
+ CacheConfiguration<Integer, V>... ccfgs
+ ) throws Exception {
+ for (int g = 0; g < grids; g++)
+ startGrid(optimize(getConfiguration(getTestIgniteInstanceName(g))
+ .setCacheConfiguration(ccfgs)));
+
+ IgniteEx ig = grid(0);
+
+ ig.cluster().baselineAutoAdjustEnabled(false);
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ for (int i = 0; i < keys; i++) {
+ for (CacheConfiguration<Integer, V> ccfg : ccfgs)
+ ig.getOrCreateCache(ccfg.getName()).put(i, factory.apply(i));
+ }
+
+ forceCheckpoint();
+
+ return ig;
+ }
+
+ /**
+ * @param grids Number of ignite instances.
+ * @return Coordinator ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridsWithoutCache(int grids) throws Exception {
+ for (int i = 0; i < grids; i++)
+ startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
+
+ IgniteEx ignite = grid(0);
+
+ ignite.cluster().baselineAutoAdjustEnabled(false);
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ return ignite;
+ }
+
+ /**
+ * @param cnt Number of grids to start.
+ * @param snpName Snapshot to start grids from.
+ * @return Coordinator ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridsFromSnapshot(int cnt, String snpName) throws Exception {
+ return startGridsFromSnapshot(cnt, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), snpName, true);
+ }
+
+ /**
+ * @param cnt Number of grids to start.
+ * @param path Snapshot path resolver.
+ * @param snpName Snapshot to start grids from.
+ * @return Coordinator ignite instance.
+ * @throws Exception If fails.
+ */
+ protected IgniteEx startGridsFromSnapshot(int cnt,
+ Function<IgniteConfiguration, String> path,
+ String snpName,
+ boolean activate
+ ) throws Exception {
+ IgniteEx crd = null;
+
+ for (int i = 0; i < cnt; i++) {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
+
+ cfg.setWorkDirectory(Paths.get(path.apply(cfg), snpName).toString());
+
+ if (crd == null)
+ crd = startGrid(cfg);
+ else
+ startGrid(cfg);
+ }
+
+ crd.cluster().baselineAutoAdjustEnabled(false);
+
+ if (activate)
+ crd.cluster().state(ACTIVE);
+
+ return crd;
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @return Snapshot manager related to given ignite instance.
+ */
+ public static IgniteSnapshotManager snp(IgniteEx ignite) {
+ return ignite.context().cache().context().snapshotMgr();
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @return Directory name for ignite instance.
+ * @throws IgniteCheckedException If fails.
+ */
+ public static String folderName(IgniteEx ignite) throws IgniteCheckedException {
+ return ignite.context().pdsFolderResolver().resolveFolders().folderName();
+ }
+
+ /**
+ * @param cache Ignite cache to check.
+ */
+ protected static void assertSnapshotCacheKeys(IgniteCache<?, ?> cache) {
+ List<Integer> keys = IntStream.range(0, CACHE_KEYS_RANGE).boxed().collect(Collectors.toList());
+
+ cache.query(new ScanQuery<>(null))
+ .forEach(e -> keys.remove((Integer)e.getKey()));
+
+ assertTrue("Snapshot must contains pre-created cache data " +
+ "[cache=" + cache.getName() + ", keysLeft=" + keys + ']', keys.isEmpty());
+ }
+
+ /**
+ * @param ignite Ignite instance to resolve discovery spi to.
+ * @return BlockingCustomMessageDiscoverySpi instance.
+ */
+ protected static BlockingCustomMessageDiscoverySpi discoSpi(IgniteEx ignite) {
+ return (BlockingCustomMessageDiscoverySpi)ignite.context().discovery().getInjectedDiscoverySpi();
+ }
+
+ /** */
+ protected static class BlockingCustomMessageDiscoverySpi extends TcpDiscoverySpi {
+ /** List of messages which have been blocked. */
+ private final List<DiscoverySpiCustomMessage> blocked = new CopyOnWriteArrayList<>();
+
+ /** Discovery custom message filter. */
+ private volatile IgnitePredicate<DiscoveryCustomMessage> blockPred;
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ if (msg instanceof CustomMessageWrapper) {
+ DiscoveryCustomMessage msg0 = ((CustomMessageWrapper)msg).delegate();
+
+ if (blockPred != null && blockPred.apply(msg0)) {
+ blocked.add(msg);
+
+ if (log.isInfoEnabled())
+ log.info("Discovery message has been blocked: " + msg0);
+
+ return;
+ }
+ }
+
+ super.sendCustomEvent(msg);
+ }
+
+ /** Start blocking discovery custom messages. */
+ public synchronized void block(IgnitePredicate<DiscoveryCustomMessage> pred) {
+ blockPred = pred;
+ }
+
+ /** Unblock and send previously saved discovery custom messages */
+ public synchronized void unblock() {
+ blockPred = null;
+
+ for (DiscoverySpiCustomMessage msg : blocked)
+ sendCustomEvent(msg);
+
+ blocked.clear();
+ }
+
+ /**
+ * @param timeout Timeout to wait blocking messages.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ public void waitBlocked(long timeout) throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(() -> !blocked.isEmpty(), timeout);
+ }
+ }
+
+ /** */
+ protected static class DelegateSnapshotSender extends SnapshotSender {
+ /** Delegate call to. */
+ protected final SnapshotSender delegate;
+
+ /**
+ * @param delegate Delegate call to.
+ */
+ public DelegateSnapshotSender(IgniteLogger log, Executor exec, SnapshotSender delegate) {
+ super(log, exec);
+
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void init(int partsCnt) {
+ delegate.init(partsCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCacheConfig0(File ccfg, String cacheDirName) {
+ delegate.sendCacheConfig(ccfg, cacheDirName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+ delegate.sendMarshallerMeta(mappings);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendBinaryMeta0(Collection<BinaryType> types) {
+ delegate.sendBinaryMeta(types);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ delegate.sendPart(part, cacheDirName, pair, length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+ delegate.sendDelta(delta, cacheDirName, pair);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close0(Throwable th) {
+ delegate.close(th);
+ }
+ }
+
+ /** Account item. */
+ protected static class Account implements Serializable {
+ /** Serial version. */
+ private static final long serialVersionUID = 0L;
+
+ /** User id. */
+ @QuerySqlField(index = true)
+ private final int id;
+
+ /** Order value. */
+ @QuerySqlField
+ protected int balance;
+
+ /**
+ * @param id User id.
+ * @param balance User balance.
+ */
+ public Account(int id, int balance) {
+ this.id = id;
+ this.balance = balance;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Account item = (Account)o;
+
+ return id == item.id &&
+ balance == item.balance;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(id, balance);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Account.class, this);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
new file mode 100644
index 0000000..7fe818a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -0,0 +1,945 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest.checkPartitionMapExchangeFinished;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Cluster-wide snapshot test.
+ */
+public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
+ /** Time to wait while rebalance may happen. */
+ private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
+
+ /** Cache configuration for test. */
+ private static CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
+ .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+ .setBackups(2)
+ .setAffinity(new RendezvousAffinityFunction(false, CACHE_PARTS_COUNT));
+
+ /** {@code true} if node should be started in separate jvm. */
+ protected volatile boolean jvm;
+
+ /** @throws Exception If fails. */
+ @Before
+ @Override public void beforeTestSnapshot() throws Exception {
+ super.beforeTestSnapshot();
+
+ jvm = false;
+ }
+
+ /**
+ * Take snapshot from the whole cluster and check snapshot consistency when the
+ * cluster tx load starts on a new topology version.
+ * Note: Client nodes and server nodes not in baseline topology must not be affected.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
+ int grids = 3;
+ String snpName = "backup23012020";
+ AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
+ AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
+
+ IgniteEx ignite = startGrids(grids);
+ startClientGrid();
+
+ ignite.cluster().baselineAutoAdjustEnabled(false);
+ ignite.cluster().state(ACTIVE);
+
+ // Start node not in baseline.
+ IgniteEx notBltIgnite = startGrid(grids);
+ File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
+ String notBltDirName = folderName(notBltIgnite);
+
+ IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
+
+ for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
+ atCache.put(atKey.incrementAndGet(), -1);
+ ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
+ }
+
+ forceCheckpoint();
+
+ CountDownLatch loadLatch = new CountDownLatch(1);
+
+ ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+ /** {@inheritDoc} */
+ @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+ return;
+
+ // First discovery custom event will be a snapshot operation.
+ assertTrue(isSnapshotOperation(fut.firstEvent()));
+ assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+ return;
+
+ DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
+
+ assertNotNull(msg);
+
+ if (msg instanceof SnapshotDiscoveryMessage)
+ loadLatch.countDown();
+ }
+ });
+
+ // Start cache load.
+ IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ U.await(loadLatch);
+
+ while (!Thread.currentThread().isInterrupted()) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int atIdx = rnd.nextInt(grids);
+
+ // Zero out the sign bit.
+ grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+
+ int txIdx = rnd.nextInt(grids);
+
+ grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new RuntimeException(e);
+ }
+ }, 3, "cache-put-");
+
+ try {
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
+
+ U.await(loadLatch, 10, TimeUnit.SECONDS);
+
+ fut.get();
+ }
+ finally {
+ loadFut.cancel();
+ }
+
+ // Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
+ stopAllGrids();
+
+ assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
+ !searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
+
+ IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
+
+ assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
+ CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
+
+ assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
+ CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
+
+ snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
+ .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+ "[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+
+ snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
+ .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+ "[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotPrimaryBackupsTheSame() throws Exception {
+ int grids = 3;
+ AtomicInteger cacheKey = new AtomicInteger();
+
+ IgniteEx ignite = startGridsWithCache(grids, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ IgniteInternalFuture<Long> atLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int gId = rnd.nextInt(grids);
+
+ IgniteCache<Integer, Integer> txCache = grid(gId).getOrCreateCache(dfltCacheCfg.getName());
+
+ try (Transaction tx = grid(gId).transactions().txStart()) {
+ txCache.put(cacheKey.incrementAndGet(), 0);
+
+ txCache.put(cacheKey.incrementAndGet(), 1);
+
+ tx.commit();
+ }
+ }
+ }, 5, "tx-cache-put-");
+
+ IgniteInternalFuture<Long> txLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ IgniteCache<Integer, Integer> atomicCache = grid(rnd.nextInt(grids))
+ .getOrCreateCache(atomicCcfg);
+
+ atomicCache.put(cacheKey.incrementAndGet(), 0);
+ }
+ }, 5, "atomic-cache-put-");
+
+ try {
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ fut.get();
+ }
+ finally {
+ txLoadFut.cancel();
+ atLoadFut.cancel();
+ }
+
+ stopAllGrids();
+
+ IgniteEx snpIg0 = startGridsFromSnapshot(grids, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+ // Block whole rebalancing.
+ for (Ignite g : G.allGrids())
+ TestRecordingCommunicationSpi.spi(g).blockMessages((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);
+
+ snpIg0.cluster().state(ACTIVE);
+
+ assertFalse("Primary and backup in snapshot must have the same counters. Rebalance must not happen.",
+ GridTestUtils.waitForCondition(() -> {
+ boolean hasMsgs = false;
+
+ for (Ignite g : G.allGrids())
+ hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+ return hasMsgs;
+ }, REBALANCE_AWAIT_TIME));
+
+ TestRecordingCommunicationSpi.stopBlockAll();
+
+ assertPartitionsSame(idleVerify(snpIg0, dfltCacheCfg.getName(), atomicCcfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotConsistencyUnderLoad() throws Exception {
+ int clientsCnt = 50;
+ int balance = 10_000;
+ int transferLimit = 1000;
+ int total = clientsCnt * balance * 2;
+ int grids = 3;
+ int transferThreadCnt = 4;
+ AtomicBoolean stop = new AtomicBoolean(false);
+ CountDownLatch txStarted = new CountDownLatch(1);
+
+ CacheConfiguration<Integer, Account> eastCcfg = txCacheConfig(new CacheConfiguration<>("east"));
+ CacheConfiguration<Integer, Account> westCcfg = txCacheConfig(new CacheConfiguration<>("west"));
+
+ startGridsWithCache(grids, clientsCnt, key -> new Account(key, balance), eastCcfg, westCcfg);
+
+ Ignite client = startClientGrid(grids);
+
+ assertEquals("The initial summary value in all caches is not correct.",
+ total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+ forceCheckpoint();
+
+ IgniteInternalFuture<?> txLoadFut = GridTestUtils.runMultiThreadedAsync(
+ () -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int amount;
+
+ try {
+ while (!stop.get()) {
+ IgniteEx ignite = grid(rnd.nextInt(grids));
+ IgniteCache<Integer, Account> east = ignite.cache("east");
+ IgniteCache<Integer, Account> west = ignite.cache("west");
+
+ amount = rnd.nextInt(transferLimit);
+
+ txStarted.countDown();
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ Integer id = rnd.nextInt(clientsCnt);
+
+ Account acc0 = east.get(id);
+ Account acc1 = west.get(id);
+
+ acc0.balance -= amount;
+ acc1.balance += amount;
+
+ east.put(id, acc0);
+ west.put(id, acc1);
+
+ tx.commit();
+ }
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, e);
+
+ fail("Tx must not be failed.");
+ }
+ }, transferThreadCnt, "transfer-account-thread-");
+
+ try {
+ U.await(txStarted);
+
+ grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+ }
+ finally {
+ stop.set(true);
+ }
+
+ txLoadFut.get();
+
+ assertEquals("The summary value should not changed during tx transfers.",
+ total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+ stopAllGrids();
+
+ IgniteEx snpIg0 = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+ assertEquals("The total amount of all cache values must not changed in snapshot.",
+ total, sumAllCacheValues(snpIg0, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithCacheNodeFilter() throws Exception {
+ int grids = 4;
+
+ CacheConfiguration<Integer, Integer> ccfg = txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+ .setNodeFilter(node -> node.consistentId().toString().endsWith("1"));
+
+ IgniteEx ig0 = startGridsWithoutCache(grids);
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ig0.getOrCreateCache(ccfg).put(i, i);
+
+ ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(grids,
+ cfg -> resolveSnapshotWorkDirectory(cfg.setCacheConfiguration()).getAbsolutePath(),
+ SNAPSHOT_NAME,
+ true);
+
+ awaitPartitionMapExchange();
+ checkCacheDiscoveryDataConsistent();
+
+ CacheGroupDescriptor descr = snp.context().cache().cacheGroupDescriptors()
+ .get(CU.cacheId(ccfg.getName()));
+
+ assertNotNull(descr);
+ assertNotNull(descr.config().getNodeFilter());
+ assertEquals(ccfg.getNodeFilter().apply(grid(1).localNode()),
+ descr.config().getNodeFilter().apply(grid(1).localNode()));
+ assertSnapshotCacheKeys(snp.cache(ccfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testRejectCacheStopDuringClusterSnapshot() throws Exception {
+ // Block the full message, so cluster-wide snapshot operation would not be fully completed.
+ IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+ spi.block((msg) -> {
+ if (msg instanceof FullMessage) {
+ FullMessage<?> msg0 = (FullMessage<?>)msg;
+
+ assertEquals("Snapshot distributed process must be used",
+ DistributedProcess.DistributedProcessType.START_SNAPSHOT.ordinal(), msg0.type());
+
+ assertTrue("Snapshot has to be finished successfully on all nodes", msg0.error().isEmpty());
+
+ return true;
+ }
+
+ return false;
+ });
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ spi.waitBlocked(10_000L);
+
+ // Creating of new caches should not be blocked.
+ ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
+ .put(1, 1);
+
+ forceCheckpoint();
+
+ assertThrowsAnyCause(log,
+ () -> {
+ ignite.destroyCache(DEFAULT_CACHE_NAME);
+
+ return 0;
+ },
+ IgniteCheckedException.class,
+ SNP_IN_PROGRESS_ERR_MSG);
+
+ spi.unblock();
+
+ fut.get();
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testBltChangeDuringClusterSnapshot() throws Exception {
+ IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ startGrid(3);
+
+ long topVer = ignite.cluster().topologyVersion();
+
+ BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+ spi.block((msg) -> msg instanceof FullMessage);
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ spi.waitBlocked(10_000L);
+
+ // Not baseline node joins successfully.
+ String grid4Dir = folderName(startGrid(4));
+
+ // Not blt node left the cluster and snapshot not affected.
+ stopGrid(4);
+
+ // Client node must connect successfully.
+ startClientGrid(4);
+
+ // Changing baseline complete successfully.
+ ignite.cluster().setBaselineTopology(topVer);
+
+ spi.unblock();
+
+ fut.get();
+
+ assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + grid4Dir,
+ !searchDirectoryRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), grid4Dir).isPresent());
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+ spi.block((msg) -> msg instanceof FullMessage);
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ spi.waitBlocked(10_000L);
+
+ ignite.close();
+
+ assertThrowsAnyCause(log,
+ fut::get,
+ NodeStoppingException.class,
+ SNP_NODE_STOPPING_ERR_MSG);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotExistsException() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ assertThrowsAnyCause(log,
+ () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+ IgniteException.class,
+ "Snapshot with given name already exists on local node.");
+
+ stopAllGrids();
+
+ // Check that snapshot has not been accidentally deleted.
+ IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCleanedOnLeft() throws Exception {
+ CountDownLatch block = new CountDownLatch(1);
+ CountDownLatch partProcessed = new CountDownLatch(1);
+
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+ String dirNameIgnite0 = folderName(ignite);
+
+ String dirNameIgnite1 = folderName(grid(1));
+
+ snp(grid(1)).localSnapshotSenderFactory(
+ blockingLocalSnapshotSender(grid(1), partProcessed, block));
+
+ TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(grid(1));
+ commSpi1.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+
+ IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ U.await(partProcessed);
+
+ stopGrid(1);
+
+ block.countDown();
+
+ assertThrowsAnyCause(log,
+ fut::get,
+ IgniteCheckedException.class,
+ "Snapshot creation has been finished with an error");
+
+ assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + dirNameIgnite0,
+ !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite0).isPresent());
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ // Snapshot directory must be cleaned.
+ assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
+ !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
+
+ List<String> allSnapshots = snp(ignite).localSnapshotNames();
+
+ assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
+ allSnapshots.isEmpty());
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testRecoveryClusterSnapshotJvmHalted() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ String grid0Dir = folderName(ignite);
+ String grid1Dir = folderName(grid(1));
+ File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+
+ jvm = true;
+
+ IgniteConfiguration cfg2 = optimize(getConfiguration(getTestIgniteInstanceName(2)));
+
+ cfg2.getDataStorageConfiguration()
+ .setFileIOFactory(new HaltJvmFileIOFactory(new RandomAccessFileIOFactory(),
+ (Predicate<File> & Serializable) file -> {
+ // Trying to create FileIO over partition file.
+ return file.getAbsolutePath().contains(SNAPSHOT_NAME);
+ }));
+
+ startGrid(cfg2);
+
+ String grid2Dir = U.maskForFileName(cfg2.getConsistentId().toString());
+
+ jvm = false;
+
+ ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+ awaitPartitionMapExchange();
+
+ assertThrowsAnyCause(log,
+ () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+ IgniteCheckedException.class,
+ "Snapshot creation has been finished with an error");
+
+ assertTrue("Snapshot directory must be empty: " + grid0Dir,
+ !searchDirectoryRecursively(locSnpDir.toPath(), grid0Dir).isPresent());
+
+ assertTrue("Snapshot directory must be empty: " + grid1Dir,
+ !searchDirectoryRecursively(locSnpDir.toPath(), grid1Dir).isPresent());
+
+ assertTrue("Snapshot directory must exist due to grid2 has been halted and cleanup not fully performed: " + grid2Dir,
+ searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+ IgniteEx grid2 = startGrid(2);
+
+ assertTrue("Snapshot directory must be empty after recovery: " + grid2Dir,
+ !searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+ awaitPartitionMapExchange();
+
+ assertTrue("Snapshot directory must be empty", grid2.context().cache().context().snapshotMgr().localSnapshotNames().isEmpty());
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+ .get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithRebalancing() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ TestRecordingCommunicationSpi commSpi = TestRecordingCommunicationSpi.spi(ignite);
+ commSpi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+
+ startGrid(2);
+
+ ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+ commSpi.waitForBlocked();
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ commSpi.stopBlock(true);
+
+ fut.get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+ awaitPartitionMapExchange();
+ checkPartitionMapExchangeFinished();
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithExplicitPath() throws Exception {
+ File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+ try {
+ IgniteEx ignite = null;
+
+ for (int i = 0; i < 2; i++) {
+ IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
+
+ cfg.setSnapshotPath(exSnpDir.getAbsolutePath());
+
+ ignite = startGrid(cfg);
+ }
+
+ ignite.cluster().baselineAutoAdjustEnabled(false);
+ ignite.cluster().state(ACTIVE);
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+ .get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(2, cfg -> exSnpDir.getAbsolutePath(), SNAPSHOT_NAME, true);
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+ finally {
+ stopAllGrids();
+
+ U.delete(exSnpDir);
+ }
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotMetrics() throws Exception {
+ String newSnapshotName = SNAPSHOT_NAME + "_new";
+ CountDownLatch deltaApply = new CountDownLatch(1);
+ CountDownLatch deltaBlock = new CountDownLatch(1);
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ MetricRegistry mreg0 = ignite.context().metric().registry(SNAPSHOT_METRICS);
+
+ LongMetric startTime = mreg0.findMetric("LastSnapshotStartTime");
+ LongMetric endTime = mreg0.findMetric("LastSnapshotEndTime");
+ ObjectGauge<String> snpName = mreg0.findMetric("LastSnapshotName");
+ ObjectGauge<String> errMsg = mreg0.findMetric("LastSnapshotErrorMessage");
+ ObjectGauge<List<String>> snpList = mreg0.findMetric("LocalSnapshotNames");
+
+ // Snapshot process will be blocked when delta partition files processing starts.
+ snp(ignite).localSnapshotSenderFactory(
+ blockingLocalSnapshotSender(ignite, deltaApply, deltaBlock));
+
+ assertEquals("Snapshot start time must be undefined prior to snapshot operation started.",
+ 0, startTime.value());
+ assertEquals("Snapshot end time must be undefined to snapshot operation started.",
+ 0, endTime.value());
+ assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty());
+ assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty());
+ assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty());
+
+ long cutoffStartTime = U.currentTimeMillis();
+
+ IgniteFuture<Void> fut0 = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ U.await(deltaApply);
+
+ assertTrue("Snapshot start time must be set prior to snapshot operation started " +
+ "[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']',
+ startTime.value() >= cutoffStartTime);
+ assertEquals("Snapshot end time must be zero prior to snapshot operation started.",
+ 0, endTime.value());
+ assertEquals("Snapshot name must be set prior to snapshot operation started.",
+ SNAPSHOT_NAME, snpName.value());
+ assertTrue("Snapshot error message must null prior to snapshot operation started.",
+ errMsg.value().isEmpty());
+
+ IgniteFuture<Void> fut1 = grid(1).snapshot().createSnapshot(newSnapshotName);
+
+ assertThrowsWithCause((Callable<Object>)fut1::get, IgniteException.class);
+
+ MetricRegistry mreg1 = grid(1).context().metric().registry(SNAPSHOT_METRICS);
+
+ LongMetric startTime1 = mreg1.findMetric("LastSnapshotStartTime");
+ LongMetric endTime1 = mreg1.findMetric("LastSnapshotEndTime");
+ ObjectGauge<String> snpName1 = mreg1.findMetric("LastSnapshotName");
+ ObjectGauge<String> errMsg1 = mreg1.findMetric("LastSnapshotErrorMessage");
+
+ assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
+ startTime1.value() > 0);
+ assertEquals("Snapshot end time must zero for failed on start snapshots.",
+ 0, endTime1.value());
+ assertEquals("Snapshot name must be set when snapshot operation already finished.",
+ newSnapshotName, snpName1.value());
+ assertNotNull("Concurrent snapshot operation must failed.",
+ errMsg1.value());
+
+ deltaBlock.countDown();
+
+ fut0.get();
+
+ assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
+ startTime.value() > 0);
+ assertTrue("Snapshot end time must be greater than zero for finished snapshot.",
+ endTime.value() > 0);
+ assertEquals("Snapshot name must be set when snapshot operation already finished.",
+ SNAPSHOT_NAME, snpName.value());
+ assertTrue("Concurrent snapshot operation must finished successfully.",
+ errMsg.value().isEmpty());
+ assertEquals("Only the first snapshot must be created and stored on disk.",
+ Collections.singletonList(SNAPSHOT_NAME), snpList.value());
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotIncorrectNameFails() throws Exception {
+ IgniteEx ignite = startGridsWithCache(1, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ assertThrowsAnyCause(log,
+ () -> ignite.snapshot().createSnapshot("--№=+.:(snapshot)").get(),
+ IllegalArgumentException.class,
+ "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithOfflineBlt() throws Exception {
+ IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ stopGrid(2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+ .get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+ awaitPartitionMapExchange();
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ assertPartitionsSame(idleVerify(snp, dfltCacheCfg.getName()));
+ }
+
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithSharedCacheGroup() throws Exception {
+ CacheConfiguration<Integer, Integer> ccfg1 = txCacheConfig(new CacheConfiguration<>("tx1"));
+ CacheConfiguration<Integer, Integer> ccfg2 = txCacheConfig(new CacheConfiguration<>("tx2"));
+
+ ccfg1.setGroupName("group");
+ ccfg2.setGroupName("group");
+
+ IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+ awaitPartitionMapExchange();
+
+ assertSnapshotCacheKeys(snp.cache(ccfg1.getName()));
+ assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param started Latch will be released when delta partition processing starts.
+ * @param blocked Latch to await delta partition processing.
+ * @return Factory which produces local snapshot senders.
+ */
+ private Function<String, SnapshotSender> blockingLocalSnapshotSender(IgniteEx ignite,
+ CountDownLatch started,
+ CountDownLatch blocked
+ ) {
+ Function<String, SnapshotSender> old = snp(ignite).localSnapshotSenderFactory();
+
+ return (snpName) -> new DelegateSnapshotSender(log, snp(ignite).snapshotExecutorService(), old.apply(snpName)) {
+ @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+ if (log.isInfoEnabled())
+ log.info("Processing delta file has been blocked: " + delta.getName());
+
+ started.countDown();
+
+ try {
+ U.await(blocked);
+
+ if (log.isInfoEnabled())
+ log.info("Latch released. Processing delta file continued: " + delta.getName());
+
+ super.sendDelta0(delta, cacheDirName, pair);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException("Interrupted by node stop", e);
+ }
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isMultiJvm() {
+ return jvm;
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param caches Cache names to read values.
+ * @return Summary value.
+ */
+ private static int sumAllCacheValues(Ignite ignite, int keys, String... caches) {
+ AtomicInteger total = new AtomicInteger();
+
+ for (String name : caches) {
+ IgniteCache<Integer, Account> cache = ignite.cache(name);
+
+ for (int key = 0; key < keys; key++)
+ total.addAndGet(cache.get(key).balance);
+ }
+
+ return total.get();
+ }
+
+ /**
+ * I/O Factory which will halt JVM on conditions occurred.
+ */
+ private static class HaltJvmFileIOFactory implements FileIOFactory {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /** Delegate factory. */
+ private final FileIOFactory delegate;
+
+ /** Condition to halt. */
+ private final Predicate<File> pred;
+
+ /**
+ * @param delegate Delegate factory.
+ */
+ public HaltJvmFileIOFactory(FileIOFactory delegate, Predicate<File> pred) {
+ this.delegate = delegate;
+ this.pred = pred;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO delegate = this.delegate.create(file, modes);
+
+ if (pred.test(file))
+ Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
+
+ return delegate;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
new file mode 100644
index 0000000..0aaa789
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.mxbean.SnapshotMXBean;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+
+/**
+ * Tests {@link SnapshotMXBean}.
+ */
+public class IgniteSnapshotMXBeanTest extends AbstractSnapshotSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setMetricExporterSpi(new JmxMetricExporterSpi());
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testCreateSnapshot() throws Exception {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS);
+
+ assertEquals("Snapshot end time must be undefined on first snapshot operation starts.",
+ 0, getLastSnapshotEndTime(snpMBean));
+
+ SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class);
+
+ mxBean.createSnapshot(SNAPSHOT_NAME);
+
+ assertTrue("Waiting for snapshot operation failed.",
+ GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, 10_000));
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+ assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+ }
+
+ /**
+
+ * @param mBean Ignite snapshot MBean.
+ * @return Value of snapshot end time.
+ */
+ private static long getLastSnapshotEndTime(DynamicMBean mBean) {
+ try {
+ return (long)mBean.getAttribute("LastSnapshotEndTime");
+ }
+ catch (MBeanException | ReflectionException | AttributeNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
new file mode 100644
index 0000000..b957d48
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+
+/**
+ * Default snapshot manager test.
+ */
+public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotLocalPartitions() throws Exception {
+ IgniteEx ig = startGridsWithCache(1, 4096, key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ for (int i = 4096; i < 8192; i++) {
+ ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, i) {
+ @Override public String toString() {
+ return "_" + super.toString();
+ }
+ });
+ }
+
+ GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+ IgniteSnapshotManager mgr = snp(ig);
+
+ // Collection of pairs group and appropriate cache partition to be snapshot.
+ IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
+ SNAPSHOT_NAME,
+ F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
+
+ snpFut.get();
+
+ File cacheWorkDir = ((FilePageStoreManager)ig.context()
+ .cache()
+ .context()
+ .pageStore())
+ .cacheWorkDir(dfltCacheCfg);
+
+ // Checkpoint forces on cluster deactivation (currently only single node in cluster),
+ // so we must have the same data in snapshot partitions and those which left
+ // after node stop.
+ stopGrid(ig.name());
+
+ // Calculate CRCs.
+ IgniteConfiguration cfg = ig.context().config();
+ PdsFolderSettings settings = ig.context().pdsFolderResolver().resolveFolders();
+ String nodePath = databaseRelativePath(settings.folderName());
+ File binWorkDir = resolveBinaryWorkDir(cfg.getWorkDirectory(), settings.folderName());
+ File marshWorkDir = mappingFileStoreWorkDir(U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome()));
+ File snpBinWorkDir = resolveBinaryWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath(), settings.folderName());
+ File snpMarshWorkDir = mappingFileStoreWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath());
+
+ final Map<String, Integer> origPartCRCs = calculateCRC32Partitions(cacheWorkDir);
+ final Map<String, Integer> snpPartCRCs = calculateCRC32Partitions(
+ FilePageStoreManager.cacheWorkDir(U.resolveWorkDirectory(mgr.snapshotLocalDir(SNAPSHOT_NAME)
+ .getAbsolutePath(),
+ nodePath,
+ false),
+ cacheDirName(dfltCacheCfg)));
+
+ assertEquals("Partitions must have the same CRC after file copying and merging partition delta files",
+ origPartCRCs, snpPartCRCs);
+ assertEquals("Binary object mappings must be the same for local node and created snapshot",
+ calculateCRC32Partitions(binWorkDir), calculateCRC32Partitions(snpBinWorkDir));
+ assertEquals("Marshaller meta mast be the same for local node and created snapshot",
+ calculateCRC32Partitions(marshWorkDir), calculateCRC32Partitions(snpMarshWorkDir));
+
+ File snpWorkDir = mgr.snapshotTmpDir();
+
+ assertEquals("Snapshot working directory must be cleaned after usage", 0, snpWorkDir.listFiles().length);
+ }
+
+ /**
+ * Test that all partitions are copied successfully even after multiple checkpoints occur during
+ * the long copy of cache partition files.
+ *
+ * Data consistency checked through a test node started right from snapshot directory and all values
+ * read successes.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testSnapshotLocalPartitionMultiCpWithLoad() throws Exception {
+ int valMultiplier = 2;
+ CountDownLatch slowCopy = new CountDownLatch(1);
+
+ // Start grid node with data before each test.
+ IgniteEx ig = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+ AtomicInteger cntr = new AtomicInteger();
+ CountDownLatch ldrLatch = new CountDownLatch(1);
+ IgniteSnapshotManager mgr = snp(ig);
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+ IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ try {
+ U.await(ldrLatch);
+
+ while (!Thread.currentThread().isInterrupted())
+ ig.cache(DEFAULT_CACHE_NAME).put(cntr.incrementAndGet(),
+ new Account(cntr.incrementAndGet(), cntr.incrementAndGet()));
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ log.warning("Loader has been interrupted", e);
+ }
+ }, 5, "cache-loader-");
+
+ // Register task but not schedule it on the checkpoint.
+ SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+ cctx.localNodeId(),
+ F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ try {
+ U.await(slowCopy);
+
+ delegate.sendPart0(part, cacheDirName, pair, length);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ db.addCheckpointListener(new DbCheckpointListener() {
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) {
+ Map<Integer, Set<Integer>> processed = GridTestUtils.getFieldValue(snpFutTask,
+ SnapshotFutureTask.class,
+ "processed");
+
+ if (!processed.isEmpty())
+ ldrLatch.countDown();
+ }
+ });
+
+ try {
+ snpFutTask.start();
+
+ // Change data before snapshot creation which must be included into it with correct value multiplier.
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, valMultiplier * i));
+
+ // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+ // due to checkpoint already running and we need to schedule the next one
+ // right after current will be completed.
+ cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, SNAPSHOT_NAME));
+
+ snpFutTask.awaitStarted();
+
+ db.forceCheckpoint("snapshot is ready to be created")
+ .futureFor(CheckpointState.MARKER_STORED_TO_DISK)
+ .get();
+
+ // Change data after snapshot.
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, 3 * i));
+
+ // Snapshot on the next checkpoint must copy page to delta file before write it to a partition.
+ forceCheckpoint(ig);
+
+ slowCopy.countDown();
+
+ snpFutTask.get();
+ }
+ finally {
+ loadFut.cancel();
+ }
+
+ // Now can stop the node and check created snapshots.
+ stopGrid(0);
+
+ cleanPersistenceDir(ig.name());
+
+ // Start Ignite instance from snapshot directory.
+ IgniteEx ig2 = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+ assertEquals("snapshot data consistency violation [key=" + i + ']',
+ i * valMultiplier, ((Account)ig2.cache(DEFAULT_CACHE_NAME).get(i)).balance);
+ }
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotLocalPartitionNotEnoughSpace() throws Exception {
+ String err_msg = "Test exception. Not enough space.";
+ AtomicInteger throwCntr = new AtomicInteger();
+ RandomAccessFileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+ IgniteEx ig = startGridWithCache(dfltCacheCfg.setAffinity(new ZeroPartitionAffinityFunction()),
+ CACHE_KEYS_RANGE);
+
+ // Change data after backup.
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ ig.cache(DEFAULT_CACHE_NAME).put(i, 2 * i);
+
+ GridCacheSharedContext<?, ?> cctx0 = ig.context().cache().context();
+
+ IgniteSnapshotManager mgr = snp(ig);
+
+ mgr.ioFactory(new FileIOFactory() {
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ FileIO fileIo = ioFactory.create(file, modes);
+
+ if (file.getName().equals(IgniteSnapshotManager.partDeltaFileName(0)))
+ return new FileIODecorator(fileIo) {
+ @Override public int writeFully(ByteBuffer srcBuf) throws IOException {
+ if (throwCntr.incrementAndGet() == 3)
+ throw new IOException(err_msg);
+
+ return super.writeFully(srcBuf);
+ }
+ };
+
+ return fileIo;
+ }
+ });
+
+ IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
+ SNAPSHOT_NAME,
+ F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
+
+ // Check the right exception thrown.
+ assertThrowsAnyCause(log,
+ snpFut::get,
+ IOException.class,
+ err_msg);
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testSnapshotCreateLocalCopyPartitionFail() throws Exception {
+ String err_msg = "Test. Fail to copy partition: ";
+ IgniteEx ig = startGridWithCache(dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ Map<Integer, Set<Integer>> parts = new HashMap<>();
+ parts.put(CU.cacheId(DEFAULT_CACHE_NAME), new HashSet<>(Collections.singletonList(0)));
+
+ IgniteSnapshotManager mgr0 = snp(ig);
+
+ IgniteInternalFuture<?> fut = startLocalSnapshotTask(ig.context().cache().context(),
+ SNAPSHOT_NAME,
+ parts,
+ new DelegateSnapshotSender(log, mgr0.snapshotExecutorService(),
+ mgr0.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ if (pair.getPartitionId() == 0)
+ throw new IgniteException(err_msg + pair);
+
+ delegate.sendPart0(part, cacheDirName, pair, length);
+ }
+ });
+
+ assertThrowsAnyCause(log,
+ fut::get,
+ IgniteException.class,
+ err_msg);
+ }
+
+ /** @throws Exception If fails. */
+ @Test(expected = IgniteCheckedException.class)
+ public void testLocalSnapshotOnCacheStopped() throws Exception {
+ IgniteEx ig = startGridWithCache(dfltCacheCfg, CACHE_KEYS_RANGE);
+
+ startGrid(1);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ GridCacheSharedContext<?, ?> cctx0 = ig.context().cache().context();
+ IgniteSnapshotManager mgr = snp(ig);
+
+ CountDownLatch cpLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
+ SNAPSHOT_NAME,
+ F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+ new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+ @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ try {
+ U.await(cpLatch);
+
+ delegate.sendPart0(part, cacheDirName, pair, length);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ IgniteCache<?, ?> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ cache.destroy();
+
+ cpLatch.countDown();
+
+ snpFut.get(5_000, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @param src Source node to calculate.
+ * @param grps Groups to collect owning parts.
+ * @param rmtNodeId Remote node id.
+ * @return Map of collected parts.
+ */
+ private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, Set<Integer> grps, UUID rmtNodeId) {
+ Map<Integer, Set<Integer>> result = new HashMap<>();
+
+ for (Integer grpId : grps) {
+ Set<Integer> parts = src.context()
+ .cache()
+ .cacheGroup(grpId)
+ .topology()
+ .partitions(rmtNodeId)
+ .entrySet()
+ .stream()
+ .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ result.put(grpId, parts);
+ }
+
+ return result;
+ }
+
+ /**
+ * @param snpName Unique snapshot name.
+ * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+ * @param snpSndr Sender which used for snapshot sub-task processing.
+ * @return Future which will be completed when snapshot is done.
+ */
+ private static SnapshotFutureTask startLocalSnapshotTask(
+ GridCacheSharedContext<?, ?> cctx,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ SnapshotSender snpSndr
+ ) throws IgniteCheckedException {
+ SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, snpSndr);
+
+ snpFutTask.start();
+
+ // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+ // due to checkpoint already running and we need to schedule the next one
+ // right after current will be completed.
+ cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
+
+ snpFutTask.awaitStarted();
+
+ return snpFutTask;
+ }
+
+ /** */
+ private static class ZeroPartitionAffinityFunction extends RendezvousAffinityFunction {
+ @Override public int partition(Object key) {
+ return 0;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index 701c68d0..bd7b913 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -46,6 +46,7 @@ import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.MemoryMetrics;
import org.apache.ignite.PersistenceMetrics;
@@ -616,6 +617,13 @@ public class IgfsIgniteMock implements IgniteEx {
}
/** {@inheritDoc} */
+ @Override public IgniteSnapshot snapshot() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 8c59e20..38da757 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -75,6 +75,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
null,
new WalStateManager(null),
new IgniteCacheDatabaseSharedManager(),
+ null,
new IgniteCacheSnapshotManager(),
new GridCacheDeploymentManager<K, V>(),
new GridCachePartitionExchangeManager<K, V>(),
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
index 1c25310..ce7da5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
@@ -18,6 +18,11 @@
package org.apache.ignite.platform;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
@@ -34,12 +39,6 @@ import org.apache.ignite.services.ServiceContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
import static java.util.Calendar.JANUARY;
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index f4cc3ff..d0ee646 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -17,12 +17,6 @@
package org.apache.ignite.testframework;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.management.Attribute;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -72,6 +66,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.management.Attribute;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -497,9 +497,8 @@ public final class GridTestUtils {
* @param cls Exception class.
* @param msg Exception message (optional). If provided exception message
* and this message should be equal.
- * @return Thrown throwable.
*/
- public static Throwable assertThrowsAnyCause(@Nullable IgniteLogger log, Callable<?> call,
+ public static void assertThrowsAnyCause(@Nullable IgniteLogger log, Callable<?> call,
Class<? extends Throwable> cls, @Nullable String msg) {
assert call != null;
assert cls != null;
@@ -515,7 +514,7 @@ public final class GridTestUtils {
if (log != null && log.isInfoEnabled())
log.info("Caught expected exception: " + t.getMessage());
- return t;
+ return;
}
t = t.getCause();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d025e38..949ec4e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -158,9 +158,10 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
-import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER;
import static org.apache.ignite.testframework.config.GridTestProperties.IGNITE_CFG_PREPROCESSOR_CLS;
@@ -643,7 +644,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
*/
private void resolveWorkDirectory() throws Exception {
U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", true);
- U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", true);
+ U.resolveWorkDirectory(U.defaultWorkDirectory(), BINARY_META_FOLDER, true);
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 0448e9b..43bc9de 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -49,6 +49,7 @@ import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.MemoryMetrics;
import org.apache.ignite.PersistenceMetrics;
@@ -494,6 +495,11 @@ public class IgniteMock implements Ignite {
}
/** {@inheritDoc} */
+ @Override public IgniteSnapshot snapshot() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 9394760..bd52959 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -138,6 +138,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -843,7 +844,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param c Cache proxy.
*/
- protected void printPartitionState(IgniteCache<?, ?> c) {
+ protected static void printPartitionState(IgniteCache<?, ?> c) {
printPartitionState(c.getConfiguration(CacheConfiguration.class).getName(), 0);
}
@@ -853,7 +854,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
*
* Print partitionState for cache.
*/
- protected void printPartitionState(String cacheName, int firstParts) {
+ protected static void printPartitionState(String cacheName, int firstParts) {
StringBuilder sb = new StringBuilder();
sb.append("----preload sync futures----\n");
@@ -1863,7 +1864,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "cp", false));
U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false));
- U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", false));
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), BINARY_META_FOLDER, false));
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 9932f1f..636f810 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -57,6 +57,7 @@ import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
@@ -826,6 +827,11 @@ public class IgniteProcessProxy implements IgniteEx {
}
/** {@inheritDoc} */
+ @Override public IgniteSnapshot snapshot() {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 6d797b4..cf1c881 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -35,6 +35,9 @@ import org.apache.ignite.internal.encryption.MasterKeyChangeConsistencyCheckTest
import org.apache.ignite.internal.encryption.MasterKeyChangeTest;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointReadLockFailureTest;
import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersistenceSslTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
import org.apache.ignite.marshaller.GridMarshallerMappingConsistencyTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
@@ -80,7 +83,11 @@ import org.junit.runners.Suite;
MasterKeyChangeTest.class,
MasterKeyChangeConsistencyCheckTest.class,
- EncryptionMXBeanTest.class
+ EncryptionMXBeanTest.class,
+
+ IgniteSnapshotManagerSelfTest.class,
+ IgniteClusterSnapshotSelfTest.class,
+ IgniteSnapshotMXBeanTest.class
})
public class IgniteBasicWithPersistenceTestSuite {
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
index 0f54e30..76dd922 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
@@ -156,11 +156,11 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
for (List<?> row : res) {
names.add((String)row.get(0));
- assertNotNull(row.get(1));
+ assertNotNull("Metric value must be not null [name=" + row.get(0) + ']', row.get(1));
}
for (String attr : EXPECTED_ATTRIBUTES)
- assertTrue(attr + " should be exporterd via SQL view", names.contains(attr));
+ assertTrue(attr + " should be exported via SQL view", names.contains(attr));
}
/** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
new file mode 100644
index 0000000..a993911
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
+
+/**
+ * Cluster-wide snapshot test with indexes.
+ */
+public class IgniteClusterSnapshotWithIndexesTest extends AbstractSnapshotSelfTest {
+ /** Configuration with statically configured indexes. */
+ private final CacheConfiguration<Integer, Account> indexedCcfg =
+ txCacheConfig(new CacheConfiguration<Integer, Account>("indexed"))
+ .setQueryEntities(Collections.singletonList(
+ new QueryEntity(Integer.class.getName(), Account.class.getName())
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("balance", Integer.class.getName(), null)
+ .setIndexes(F.asList(new QueryIndex("id"),
+ new QueryIndex("balance")))));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getDataStorageConfiguration().setCheckpointFrequency(DFLT_CHECKPOINT_FREQ);
+
+ return cfg;
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotWithIndexes() throws Exception {
+ String tblName = "Person";
+ IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, key -> new Account(key, key), indexedCcfg);
+
+ executeSql(ignite, "CREATE TABLE " + tblName + " (id int, name varchar, age int, city varchar, " +
+ "primary key (id, name)) WITH \"cache_name=" + tblName + "\"");
+ executeSql(ignite, "CREATE INDEX ON " + tblName + "(city, age)");
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ executeSql(ignite, "INSERT INTO " + tblName + " (id, name, age, city) VALUES(?, 'name', 3, 'city')", i);
+
+ assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(ignite, selectStartSQLStatement(tblName))));
+ assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(ignite.context().cache().jcache(indexedCcfg.getName()),
+ selectStartSQLStatement(Account.class.getSimpleName()))));
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+ .get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+ assertTrue(snp.cache(indexedCcfg.getName()).indexReadyFuture().isDone());
+ assertTrue(snp.cache(tblName).indexReadyFuture().isDone());
+
+ List<List<?>> results = executeSql(snp, explainSQLStatement(tblName) + "id > 10");
+
+ // Primary key exists.
+ String explainPlan = (String)results.get(0).get(0);
+ assertTrue(explainPlan.toUpperCase().contains("\"_KEY_PK"));
+ assertFalse(explainPlan.toUpperCase().contains("_SCAN_"));
+
+ results = executeSql(snp, explainSQLStatement(tblName) + "city='city' and age=2");
+ assertUsingSecondaryIndex(results);
+
+ results = executeSql(snp.context().cache().jcache(indexedCcfg.getName()),
+ explainSQLStatement(Account.class.getSimpleName()) + "id=0");
+ assertUsingSecondaryIndex(results);
+
+ assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(snp, selectStartSQLStatement(tblName))));
+ assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(snp.context().cache().jcache(indexedCcfg.getName()),
+ selectStartSQLStatement(Account.class.getSimpleName()))));
+
+ forceCheckpoint();
+
+ // Validate indexes on start.
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
+ 0, 0, false, true);
+
+ for (Ignite node : G.allGrids()) {
+ ((IgniteEx)node).context().resource().injectGeneric(clo);
+
+ assertFalse(clo.call().hasIssues());
+ }
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotConsistentConfig() throws Exception {
+ String tblName = "PersonCache";
+ int grids = 3;
+
+ IgniteEx ignite = startGridsWithoutCache(grids);
+
+ executeSql(ignite, "CREATE TABLE " + tblName + " (id int, name varchar, age int, city varchar, " +
+ "primary key (id, name)) WITH \"cache_name=" + tblName + "\"");
+ executeSql(ignite, "CREATE INDEX SNP_IDX_0 ON " + tblName + "(age)");
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ executeSql(ignite, "INSERT INTO " + tblName + " (id, name, age, city) VALUES(?, 'name', 3, 'city')", i);
+
+ // Blocking configuration local snapshot sender.
+ List<BlockingExecutor> execs = new ArrayList<>();
+
+ for (Ignite grid : G.allGrids()) {
+ IgniteSnapshotManager mgr = snp((IgniteEx)grid);
+ Function<String, SnapshotSender> old = mgr.localSnapshotSenderFactory();
+
+ BlockingExecutor block = new BlockingExecutor(mgr.snapshotExecutorService());
+ execs.add(block);
+
+ mgr.localSnapshotSenderFactory((snpName) ->
+ new DelegateSnapshotSender(log, block, old.apply(snpName)));
+ }
+
+ IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+ List<String> idxNames = Arrays.asList("SNP_IDX_1", "SNP_IDX_2");
+
+ executeSql(ignite, "CREATE INDEX " + idxNames.get(0) + " ON " + tblName + "(city)");
+ executeSql(ignite, "CREATE INDEX " + idxNames.get(1) + " ON " + tblName + "(age, city)");
+
+ for (BlockingExecutor exec : execs)
+ exec.unblock();
+
+ fut.get();
+
+ stopAllGrids();
+
+ IgniteEx snp = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+ List<String> currIdxNames = executeSql(snp, "SELECT * FROM SYS.INDEXES").stream().
+ map(l -> (String)l.get(0))
+ .collect(Collectors.toList());
+
+ assertTrue("Concurrently created indexes must not exist in the snapshot: " + currIdxNames,
+ Collections.disjoint(idxNames, currIdxNames));
+
+ List<List<?>> results = executeSql(snp, explainSQLStatement(tblName) + "age=2");
+ assertUsingSecondaryIndex(results);
+ }
+
+ /**
+ * @param name Table name;
+ * @return Select statement.
+ */
+ private static String selectStartSQLStatement(String name) {
+ return "SELECT count(*) FROM " + name;
+ }
+
+ /**
+ * @param name Table name.
+ * @return Explain statement.
+ */
+ private static String explainSQLStatement(String name) {
+ return "explain SELECT * FROM " + name + " WHERE ";
+ }
+
+ /**
+ * @param ignite Ignite instance to execute query on.
+ * @param stmt Statement to run.
+ * @param args Arguments of statement.
+ * @return Run result.
+ */
+ private static List<List<?>> executeSql(IgniteEx ignite, String stmt, Object... args) {
+ return ignite.context().query().querySqlFields(new SqlFieldsQuery(stmt).setArgs(args), true).getAll();
+ }
+
+ /**
+ * @param cache Cache to query.
+ * @param stmt Statement to run.
+ * @return Run result.
+ */
+ private static List<List<?>> executeSql(IgniteCache<?, ?> cache, String stmt) {
+ return cache.query(new SqlFieldsQuery(stmt)).getAll();
+ }
+
+ /**
+ * @param res Statement results.
+ * @return Number of rows.
+ */
+ private static long rowsCount(List<List<?>> res) {
+ return (Long)res.get(0).get(0);
+ }
+
+ /**
+ * @param results Result of execute explain plan query.
+ */
+ private static void assertUsingSecondaryIndex(List<List<?>> results) {
+ String explainPlan = (String)results.get(0).get(0);
+
+ assertTrue(explainPlan, explainPlan.toUpperCase().contains("_IDX"));
+ assertFalse(explainPlan, explainPlan.toUpperCase().contains("_SCAN_"));
+ }
+
+ /** */
+ private static class BlockingExecutor implements Executor {
+ /** Delegate executor. */
+ private final Executor delegate;
+
+ /** Waiting tasks. */
+ private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+ /** {@code true} if tasks must be blocked. */
+ private volatile boolean block = true;
+
+ /**
+ * @param delegate Delegate executor.
+ */
+ public BlockingExecutor(Executor delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(@NotNull Runnable cmd) {
+ if (block)
+ tasks.offer(cmd);
+ else
+ delegate.execute(cmd);
+ }
+
+ /** Unblock and schedule tasks for execution. */
+ public void unblock() {
+ block = false;
+
+ Runnable r;
+
+ while ((r = tasks.poll()) != null) {
+ delegate.execute(r);
+ }
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 3f69426..2e39c34 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexi
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingWalRestoreTest;
@@ -48,7 +49,8 @@ import org.junit.runners.Suite;
RebuildIndexWithHistoricalRebalanceTest.class,
IndexingMultithreadedLoadContinuousRestartTest.class,
LongDestroyDurableBackgroundTaskTest.class,
- RebuildIndexTest.class
+ RebuildIndexTest.class,
+ IgniteClusterSnapshotWithIndexesTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 0f3645d..38c5a6b 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -317,6 +317,11 @@ public class IgniteSpringBean implements Ignite, DisposableBean, SmartInitializi
}
/** {@inheritDoc} */
+ @Override public IgniteSnapshot snapshot() {
+ return g.snapshot();
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<MemoryMetrics> memoryMetrics() {
return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
}