You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/12/01 14:49:28 UTC
[ignite] branch master updated: IGNITE-18076 Store snapshot operation warnings to a meta. (#10367)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4e2a50ab340 IGNITE-18076 Store snapshot operation warnings to a meta. (#10367)
4e2a50ab340 is described below
commit 4e2a50ab34051a9845ab0a94d92b8cff76ef3dd9
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Thu Dec 1 17:49:20 2022 +0300
IGNITE-18076 Store snapshot operation warnings to a meta. (#10367)
---
.../commandline/snapshot/SnapshotCheckCommand.java | 4 +-
.../apache/ignite/util/GridCommandHandlerTest.java | 100 ++++----
.../snapshot/DataStreamerUpdatesHandler.java | 8 +-
.../snapshot/IgniteSnapshotManager.java | 135 ++++++++---
.../persistence/snapshot/SnapshotHandler.java | 4 +-
.../snapshot/SnapshotHandlerContext.java | 4 +-
.../persistence/snapshot/SnapshotMetadata.java | 23 +-
.../snapshot/SnapshotOperationRequest.java | 21 +-
.../SnapshotPartitionsQuickVerifyHandler.java | 4 +-
.../SnapshotPartitionsVerifyTaskResult.java | 31 +++
...xception.java => SnapshotWarningException.java} | 6 +-
.../visor/snapshot/VisorSnapshotCheckTask.java | 12 +-
.../snapshot/EncryptedSnapshotTest.java | 4 +-
.../snapshot/IgniteClusterSnapshotCheckTest.java | 18 +-
.../IgniteClusterSnapshotRestoreSelfTest.java | 3 +-
.../IgniteClusterSnapshotStreamerTest.java | 262 ++++++++++++++++-----
.../snapshot/IgniteSnapshotConsistencyTest.java | 3 +-
.../IgniteSnapshotRestoreFromRemoteTest.java | 4 +-
.../IgniteClusterSnapshotCheckWithIndexesTest.java | 9 +-
19 files changed, 478 insertions(+), 177 deletions(-)
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCheckCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCheckCommand.java
index 501257949a7..bda10028a8b 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCheckCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCheckCommand.java
@@ -22,7 +22,7 @@ import java.util.Map;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.commandline.CommandArgIterator;
import org.apache.ignite.internal.commandline.argument.CommandArgUtils;
-import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.snapshot.VisorSnapshotCheckTask;
import org.apache.ignite.internal.visor.snapshot.VisorSnapshotCheckTaskArg;
@@ -82,6 +82,6 @@ public class SnapshotCheckCommand extends SnapshotSubcommand {
/** {@inheritDoc} */
@Override protected void printResult(Object res, IgniteLogger log) {
- ((IdleVerifyResultV2)res).print(log::info, true);
+ ((SnapshotPartitionsVerifyTaskResult)res).print(log::info);
}
}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 358f143c69c..3972bf15f11 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -97,20 +97,16 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandler;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandler;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerContext;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType;
-import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerWarningException;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
-import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpConfiguration;
import org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpStrategy;
import org.apache.ignite.internal.processors.cache.warmup.WarmUpTestPluginProvider;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.BasicRateLimiter;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
@@ -130,14 +126,12 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
@@ -3080,52 +3074,70 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0));
cfg.getConnectorConfiguration().setHost("localhost");
- cfg.setPluginProviders(new AbstractTestPluginProvider() {
- /** {@inheritDoc} */
- @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
- super.initExtensions(ctx, registry);
-
- // Simulates warning occurs at snapshot creation.
- registry.registerExtension(SnapshotHandler.class, new SnapshotHandler<Void>() {
- /** {@inheritDoc} */
- @Override public SnapshotHandlerType type() {
- return SnapshotHandlerType.CREATE;
- }
+ IgniteEx ig = startGrid(cfg);
- /** {@inheritDoc} */
- @Override public void complete(String name,
- Collection<SnapshotHandlerResult<Void>> results) throws Exception {
- throw new SnapshotHandlerWarningException(DataStreamerUpdatesHandler.WRN_MSG);
- }
+ cfg = getConfiguration(getTestIgniteInstanceName(1));
+ cfg.getConnectorConfiguration().setHost("localhost");
- /** {@inheritDoc} */
- @Nullable @Override public Void invoke(SnapshotHandlerContext ctx) {
- return null;
- }
- });
- }
+ startGrid(cfg);
- /** {@inheritDoc} */
- @Override public String name() {
- return "SnapshotWarningSimulationPlugin";
+ ig.cluster().state(ACTIVE);
+ createCacheAndPreload(ig, 100);
+
+ TestRecordingCommunicationSpi cm = (TestRecordingCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+ cm.blockMessages(DataStreamerRequest.class, grid(1).name());
+
+ AtomicBoolean stopLoading = new AtomicBoolean();
+
+ IgniteInternalFuture<?> loadFut = runAsync(() -> {
+ try (IgniteDataStreamer<Integer, Integer> ds = ig.dataStreamer(DEFAULT_CACHE_NAME)) {
+ int i = 100;
+
+ while (!stopLoading.get()) {
+ ds.addData(i, i);
+
+ i++;
+ }
}
});
- IgniteEx ig = startGrid(cfg);
- ig.cluster().state(ACTIVE);
- createCacheAndPreload(ig, 100);
+ cm.waitForBlocked(IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER);
- injectTestSystemOut();
+ try {
+ injectTestSystemOut();
- CommandHandler hnd = new CommandHandler();
+ CommandHandler hnd = new CommandHandler();
+
+ List<String> args = new ArrayList<>(F.asList("--snapshot", "create", "testDsSnp", "--sync"));
+
+ int code = execute(hnd, args);
- List<String> args = new ArrayList<>(F.asList("--snapshot", "create", "testDsSnp", "--sync"));
+ assertEquals(EXIT_CODE_UNEXPECTED_ERROR, code);
- int code = execute(hnd, args);
+ LogListener logLsnr = LogListener.matches(DataStreamerUpdatesHandler.WRN_MSG).times(1).build();
+ logLsnr.accept(testOut.toString());
+ logLsnr.check();
- assertEquals(EXIT_CODE_UNEXPECTED_ERROR, code);
+ args = new ArrayList<>(F.asList("--snapshot", "check", "testDsSnp"));
- assertContains(log, testOut.toString(), DataStreamerUpdatesHandler.WRN_MSG);
+ code = execute(hnd, args);
+
+ assertEquals(EXIT_CODE_OK, code);
+
+ String out = testOut.toString();
+
+ logLsnr = LogListener.matches(DataStreamerUpdatesHandler.WRN_MSG).times(1).build();
+ logLsnr.accept(out);
+ logLsnr.check();
+
+ assertContains(log, out, "The check procedure has failed, conflict partitions has been found");
+ }
+ finally {
+ stopLoading.set(true);
+ cm.stopBlock();
+ loadFut.get();
+ }
}
/**
@@ -3253,7 +3265,7 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
StringBuilder sb = new StringBuilder();
- ((IdleVerifyResultV2)h.getLastOperationResult()).print(sb::append, true);
+ ((SnapshotPartitionsVerifyTaskResult)h.getLastOperationResult()).print(sb::append);
assertContains(log, sb.toString(), "The check procedure has finished, no conflicts have been found");
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
index a99ce2ce820..ee608565ef0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/DataStreamerUpdatesHandler.java
@@ -48,12 +48,12 @@ public class DataStreamerUpdatesHandler implements SnapshotHandler<Boolean> {
/** {@inheritDoc} */
@Override public void complete(String name, Collection<SnapshotHandlerResult<Boolean>> results)
- throws SnapshotHandlerWarningException {
+ throws SnapshotWarningException {
Collection<UUID> nodes = F.viewReadOnly(results, r -> r.node().id(), SnapshotHandlerResult::data);
- if (!nodes.isEmpty()) {
- throw new SnapshotHandlerWarningException(WRN_MSG + " Updates from DataStreamer detected on the nodes: " +
- nodes.stream().map(UUID::toString).collect(Collectors.joining(", ")));
+ if (!F.isEmpty(nodes)) {
+ throw new SnapshotWarningException(WRN_MSG + " Updates from DataStreamer detected on the nodes: " +
+ nodes.stream().map(UUID::toString).collect(Collectors.joining(", ")) + '.');
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index af8885c5c46..71836a10006 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -35,6 +35,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
@@ -165,6 +166,7 @@ import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -274,6 +276,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Snapshot metafile extension. */
public static final String SNAPSHOT_METAFILE_EXT = ".smf";
+ /** Snapshot temporary metafile extension. */
+ public static final String SNAPSHOT_METAFILE_TMP_EXT = ".tmp";
+
/** Prefix for snapshot threads. */
public static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
@@ -765,11 +770,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
"Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
}
- if (!CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState())) {
- clusterSnpReq = req;
+ clusterSnpReq = req;
+ if (!CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()))
return new GridFinishedFuture<>();
- }
Set<UUID> leftNodes = new HashSet<>(req.nodes());
leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
@@ -828,8 +832,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
.suspend(((SnapshotFutureTask)task0).started());
}
-
- clusterSnpReq = req;
}
return task0.chain(fut -> {
@@ -843,11 +845,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
File snpDir = snapshotLocalDir(req.snapshotName(), req.snapshotPath());
- File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()));
-
- if (smf.exists())
- throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
-
snpDir.mkdirs();
SnapshotMetadata meta = new SnapshotMetadata(req.requestId(),
@@ -861,30 +858,23 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
cctx.gridConfig().getEncryptionSpi().masterKeyDigest()
);
- try (OutputStream out = Files.newOutputStream(smf.toPath())) {
- byte[] bytes = U.marshal(marsh, meta);
- int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
+ SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpDir,
+ req.streamerWarning());
- for (int off = 0; off < bytes.length; off += blockSize) {
- int len = Math.min(blockSize, bytes.length - off);
+ req.meta(meta);
- transferRateLimiter.acquire(len);
+ File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()));
- out.write(bytes, off, len);
- }
- }
+ storeSnapshotMeta(req, smf);
log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
- SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(),
- snpDir, req.streamerWarning());
-
return new SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
}
catch (IOException | IgniteCheckedException e) {
throw F.wrap(e);
}
- });
+ }, snapshotExecutorService());
}
/**
@@ -950,6 +940,31 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
}
+ /**
+ * Stores snapshot metadata.
+ *
+ * @param snpReq Snapshot operation request containing snapshot meta.
+ * @param smf File to store.
+ */
+ private void storeSnapshotMeta(SnapshotOperationRequest snpReq, File smf)
+ throws IgniteCheckedException, IOException {
+ if (smf.exists())
+ throw new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath());
+
+ try (OutputStream out = Files.newOutputStream(smf.toPath())) {
+ byte[] bytes = U.marshal(marsh, snpReq.meta());
+ int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
+
+ for (int off = 0; off < bytes.length; off += blockSize) {
+ int len = Math.min(blockSize, bytes.length - off);
+
+ transferRateLimiter.acquire(len);
+
+ out.write(bytes, off, len);
+ }
+ }
+ }
+
/**
* Execute the {@link SnapshotHandler#complete(String, Collection)} method of the snapshot handlers asynchronously.
*
@@ -1016,6 +1031,15 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName());
}
+ else if (!F.isEmpty(req.warnings())) {
+ // Pass the warnings further to the next stage for the case when snapshot started from not coordinator.
+ if (!isLocalNodeCoordinator(cctx.discovery()))
+ snpReq.warnings(req.warnings());
+
+ snpReq.meta().warnings(Collections.unmodifiableList(req.warnings()));
+
+ storeWarnings(snpReq);
+ }
removeLastMetaStorageKey();
}
@@ -1026,6 +1050,48 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return new GridFinishedFuture<>(new SnapshotOperationResponse());
}
+ /**
+ * Stores snapshot creation warnings. The warnings are rare. Also, coordinator might not be a baseline node. Thus,
+ * storing meta with warnings once is to be done at second stage initialiation on any other node. Which leads to
+ * process possible snapshot errors, deleting snapshot at second stage end. Doesn't worth. If an error occurs on
+ * warnings writing, it is logged only.
+ */
+ private void storeWarnings(SnapshotOperationRequest snpReq) {
+ assert !F.isEmpty(snpReq.warnings());
+
+ List<ClusterNode> snpNodes = cctx.kernalContext().cluster().get().nodes().stream()
+ .filter(n -> snpReq.nodes().contains(n.id())).collect(Collectors.toList());
+
+ boolean oldestBaseline = U.oldest(snpNodes,
+ n -> CU.baselineNode(n, cctx.kernalContext().state().clusterState())).equals(cctx.localNode());
+
+ if (!oldestBaseline)
+ return;
+
+ File snpDir = snapshotLocalDir(snpReq.snapshotName(), snpReq.snapshotPath());
+ File tempSmf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()) +
+ SNAPSHOT_METAFILE_TMP_EXT);
+ File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()));
+
+ try {
+ storeSnapshotMeta(snpReq, tempSmf);
+
+ Files.move(tempSmf.toPath(), smf.toPath(), StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+
+ if (log.isDebugEnabled())
+ log.debug("Snapshot metafile has been rewrited with the warnings: " + smf.getAbsolutePath());
+ }
+ catch (Exception e) {
+ log.error("Failed to store warnings of snapshot '" + snpReq.snapshotName() +
+ "' to the snapshot metafile. Snapshot won't contain them. The warnings: [" +
+ String.join(",", snpReq.warnings()) + "].", e);
+ }
+ finally {
+ U.delete(tempSmf);
+ }
+ }
+
/**
* @param id Request id.
* @param res Results.
@@ -1046,9 +1112,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
if (clusterSnpFut != null) {
if (endFail.isEmpty() && snpReq.error() == null) {
if (!F.isEmpty(snpReq.warnings())) {
- IgniteException wrn = new IgniteException("Snapshot task '" + snpReq.snapshotName() +
- "' completed with the warnings:" + U.nl() + '\t' + String.join(U.nl() + '\t',
- snpReq.warnings()));
+ SnapshotWarningException wrn = new SnapshotWarningException("Snapshot task '" +
+ snpReq.snapshotName() + "' completed with the warnings:" + U.nl() + "\t- " +
+ String.join(U.nl() + "\t- ", snpReq.warnings()));
clusterSnpFut.onDone(wrn);
}
@@ -1307,7 +1373,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @return Future with the result of execution snapshot partitions verify task, which besides calculating partition
* hashes of {@link IdleVerifyResultV2} also contains the snapshot metadata distribution across the cluster.
*/
- public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name, @Nullable String snpPath) {
+ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(String name,
+ @Nullable String snpPath) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
@@ -1315,7 +1382,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return checkSnapshot(name, snpPath, null, false).chain(f -> {
try {
- return f.get().idleVerifyResult();
+ return f.get();
}
catch (Throwable t) {
throw new GridClosureException(t);
@@ -1784,7 +1851,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param snpName Snapshot name to request.
* @param rmtSnpPath Snapshot directory path on the remote node.
* @param parts Collection of pairs group and appropriate cache partition to be snapshot.
- * @param stopChecker Node stop or prcoess interrupt checker.
+ * @param stopChecker Node stop or process interrupt checker.
* @param partHnd Received partition handler.
*/
public IgniteInternalFuture<Void> requestRemoteSnapshotFiles(
@@ -2353,7 +2420,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
try {
hnd.complete(snpName, nodesRes);
}
- catch (SnapshotHandlerWarningException e) {
+ catch (SnapshotWarningException e) {
wrns.add(e.getMessage());
}
}
@@ -3719,8 +3786,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
@Override protected IgniteException convertException(IgniteCheckedException e) {
if (e instanceof IgniteClientDisconnectedCheckedException)
return new IgniteException("Client disconnected. Snapshot result is unknown", U.convertException(e));
- else
+ else {
+ SnapshotWarningException wrn = X.cause(e, SnapshotWarningException.class);
+
+ if (wrn != null)
+ return new IgniteException(wrn.getMessage());
+
return new IgniteException("Snapshot has not been created", U.convertException(e));
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
index 93b767b018d..abd97834c3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandler.java
@@ -61,12 +61,12 @@ public interface SnapshotHandler<T> extends Extension {
*
* @param name Snapshot name.
* @param results Results from all nodes.
- * @throws SnapshotHandlerWarningException If a warning of snapshot operation occured.
+ * @throws SnapshotWarningException If a warning of snapshot operation occurred.
* @throws Exception If the snapshot operation needs to be aborted.
* @see SnapshotHandlerResult
*/
public default void complete(String name, Collection<SnapshotHandlerResult<T>> results)
- throws SnapshotHandlerWarningException, Exception {
+ throws SnapshotWarningException, Exception {
for (SnapshotHandlerResult<T> res : results) {
if (res.error() == null)
continue;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
index 37cdb529b97..091a4f3aa0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerContext.java
@@ -47,7 +47,7 @@ public class SnapshotHandlerContext {
* {@code False} otherwise. Always {@code false} for snapshot restoration.
* @param locNode Local node.
* @param snpDir The full path to the snapshot files.
- * @param streamerWrn {@code True} if concurrent streaming updates occured during snapshot operation.
+ * @param streamerWrn {@code True} if concurrent streaming updates occurred during snapshot operation.
*/
public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable Collection<String> grps, ClusterNode locNode,
File snpDir, boolean streamerWrn) {
@@ -88,7 +88,7 @@ public class SnapshotHandlerContext {
}
/**
- * @return {@code True} if concurrent streaming updates occured during snapshot operation. {@code False} otherwise.
+ * @return {@code True} if concurrent streaming updates occurred during snapshot operation. {@code False} otherwise.
*/
public boolean streamerWarning() {
return streamerWrn;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index 65bd3131f8e..0a74d50cdb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -79,6 +79,10 @@ public class SnapshotMetadata implements Serializable {
@GridToStringInclude
@Nullable private final byte[] masterKeyDigest;
+ /** Warnings occurred at snapshot creation. */
+ @GridToStringInclude
+ @Nullable private List<String> warnings;
+
/**
* F@param snpName Snapshot name.
* @param consId Consistent id of a node to which this metadata relates.
@@ -239,6 +243,22 @@ public class SnapshotMetadata implements Serializable {
return masterKeyDigest;
}
+ /**
+ * @param warnings Snapshot creation warnings.
+ */
+ public void warnings(List<String> warnings) {
+ assert this.warnings == null : "Snapshot warnings are already set. No rewriting is supposed.";
+
+ this.warnings = warnings;
+ }
+
+ /**
+ * @return Snapshot creation warnings.
+ */
+ public List<String> warnings() {
+ return warnings;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
@@ -254,7 +274,8 @@ public class SnapshotMetadata implements Serializable {
consId.equals(meta.consId) &&
Objects.equals(grpIds, meta.grpIds) &&
Objects.equals(bltNodes, meta.bltNodes) &&
- Arrays.equals(masterKeyDigest, meta.masterKeyDigest);
+ Arrays.equals(masterKeyDigest, meta.masterKeyDigest) &&
+ Objects.equals(warnings, meta.warnings);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 8adcea9e5f9..552105c4dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -61,10 +61,13 @@ public class SnapshotOperationRequest implements Serializable {
/**
* Snapshot operation warnings. Warnings do not interrupt snapshot process but raise exception at the end to make
- * the operation status 'not OK' if no other error occured.
+ * the operation status 'not OK' if no other error occurred.
*/
private volatile List<String> warnings;
+ /** Snapshot metadata. */
+ private transient SnapshotMetadata meta;
+
/**
* Warning flag of concurrent inconsistent-by-nature streamer updates.
*/
@@ -187,6 +190,8 @@ public class SnapshotOperationRequest implements Serializable {
* @param warnings Warnings of snapshot operation.
*/
public void warnings(List<String> warnings) {
+ assert this.warnings == null;
+
this.warnings = warnings;
}
@@ -204,6 +209,20 @@ public class SnapshotOperationRequest implements Serializable {
return streamerWrn = val;
}
+ /**
+ * @return Snapshot metadata.
+ */
+ public SnapshotMetadata meta() {
+ return meta;
+ }
+
+ /**
+ * Stores snapshot metadata.
+ */
+ public void meta(SnapshotMetadata meta) {
+ this.meta = meta;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotOperationRequest.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
index 0fcebb21d95..9a7aefbba39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -88,8 +88,8 @@ public class SnapshotPartitionsQuickVerifyHandler extends SnapshotPartitionsVeri
}));
if (!wrnGrps.isEmpty()) {
- throw new SnapshotHandlerWarningException("Cache partitions differ for cache groups " + S.toStringSortedDistinct(wrnGrps)
- + ". " + WRN_MSG);
+ throw new SnapshotWarningException("Cache partitions differ for cache groups " +
+ S.toStringSortedDistinct(wrnGrps) + ". " + WRN_MSG);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
index 23271e76d30..31d9b15b99a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java
@@ -20,12 +20,17 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -66,6 +71,32 @@ public class SnapshotPartitionsVerifyTaskResult extends IgniteDataTransferObject
return metas;
}
+ /**
+ * Print formatted result to the given printer. Adds the snapshot warnings if snapshot has conflicts.
+ *
+ * @param printer Consumer for handle formatted result.
+ */
+ public void print(Consumer<String> printer) {
+ idleRes.print(printer, true);
+
+ if (!F.isEmpty(idleVerifyResult().exceptions()))
+ return;
+
+ Collection<String> wrns = F.flatCollections(F.viewReadOnly(
+ F.flatCollections(metas.values()).stream().distinct().collect(Collectors.toList()),
+ SnapshotMetadata::warnings,
+ meta -> meta != null && !F.isEmpty(meta.warnings()))
+ );
+
+ if (!F.isEmpty(wrns)) {
+ GridStringBuilder sb = new GridStringBuilder("This snapshot was created with the warnings:")
+ .a(wrns.stream().collect(Collectors.joining("", U.nl() + "\t- ", "")))
+ .nl();
+
+ printer.accept(sb.toString());
+ }
+ }
+
/**
* @return Result of cluster nodes partitions comparison.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotWarningException.java
similarity index 85%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotWarningException.java
index dec861c2071..56c52d3401e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerWarningException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotWarningException.java
@@ -21,14 +21,14 @@ import org.apache.ignite.IgniteCheckedException;
/**
* Snapshot operation warning. Warnings do not interrupt snapshot process but raise exception at the end to make the
- * operation status 'not OK' if no other error occured.
+ * operation status 'not OK' if no other error occurred.
*/
-public class SnapshotHandlerWarningException extends IgniteCheckedException {
+public class SnapshotWarningException extends IgniteCheckedException {
/** Serialization version. */
private static final long serialVersionUID = 0L;
/** */
- public SnapshotHandlerWarningException(String wrnMsg) {
+ public SnapshotWarningException(String wrnMsg) {
super(wrnMsg);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCheckTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCheckTask.java
index 202fea06de8..69474b88032 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCheckTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCheckTask.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.visor.snapshot;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
-import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.visor.VisorJob;
@@ -28,17 +28,19 @@ import org.apache.ignite.internal.visor.VisorJob;
* @see IgniteSnapshotManager#checkSnapshot(String, String)
*/
@GridInternal
-public class VisorSnapshotCheckTask extends VisorSnapshotOneNodeTask<VisorSnapshotCheckTaskArg, IdleVerifyResultV2> {
+public class VisorSnapshotCheckTask extends VisorSnapshotOneNodeTask<VisorSnapshotCheckTaskArg,
+ SnapshotPartitionsVerifyTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected VisorJob<VisorSnapshotCheckTaskArg, IdleVerifyResultV2> job(VisorSnapshotCheckTaskArg arg) {
+ @Override protected VisorJob<VisorSnapshotCheckTaskArg, SnapshotPartitionsVerifyTaskResult> job(VisorSnapshotCheckTaskArg arg) {
return new VisorSnapshotCheckJob(arg, debug);
}
/** */
- private static class VisorSnapshotCheckJob extends VisorJob<VisorSnapshotCheckTaskArg, IdleVerifyResultV2> {
+ private static class VisorSnapshotCheckJob extends VisorJob<VisorSnapshotCheckTaskArg,
+ SnapshotPartitionsVerifyTaskResult> {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -51,7 +53,7 @@ public class VisorSnapshotCheckTask extends VisorSnapshotOneNodeTask<VisorSnapsh
}
/** {@inheritDoc} */
- @Override protected IdleVerifyResultV2 run(VisorSnapshotCheckTaskArg arg) throws IgniteException {
+ @Override protected SnapshotPartitionsVerifyTaskResult run(VisorSnapshotCheckTaskArg arg) throws IgniteException {
IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
return new IgniteFutureImpl<>(snpMgr.checkSnapshot(arg.snapshotName(), arg.snapshotPath()))
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
index 0dfbb85f74f..e7ede620a76 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/EncryptedSnapshotTest.java
@@ -197,7 +197,7 @@ public class EncryptedSnapshotTest extends AbstractSnapshotSelfTest {
ig = startGrids(2);
- IdleVerifyResultV2 snpCheckRes = snp(ig).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 snpCheckRes = snp(ig).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
for (Exception e : snpCheckRes.exceptions().values()) {
if (e.getMessage().contains("different master key digest"))
@@ -239,7 +239,7 @@ public class EncryptedSnapshotTest extends AbstractSnapshotSelfTest {
ig.cluster().state(ACTIVE);
- IdleVerifyResultV2 snpCheckRes = snp(ig).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 snpCheckRes = snp(ig).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
for (Exception e : snpCheckRes.exceptions().values()) {
if (e.getMessage().contains("has encrypted caches while encryption is disabled"))
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index d477e2533c6..c17f9004c82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -122,7 +122,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -147,7 +147,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
assertTrue(part0.toString(), part0.toFile().exists());
assertTrue(part0.toFile().delete());
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -172,7 +172,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
assertTrue(dir.toString(), dir.toFile().exists());
assertTrue(U.delete(dir));
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -196,7 +196,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
assertTrue(smfs[0].toString(), smfs[0].exists());
assertTrue(U.delete(smfs[0]));
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -217,7 +217,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
- IdleVerifyResultV2 res = snp(ig0).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ig0).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -270,7 +270,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
pageStore.finishRecover();
}
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -318,7 +318,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().state(ACTIVE);
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -342,7 +342,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
corruptPartitionFile(ignite, SNAPSHOT_NAME, dfltCacheCfg, PART_ID);
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -448,7 +448,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
assertNotNull(part0);
assertTrue(part0.toString(), part0.toFile().exists());
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index 0ec6648f45d..4750c59a6d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -141,7 +141,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
ignite.context().cache().context().snapshotMgr().createSnapshot(SNAPSHOT_NAME, snpDir.toString()).get(TIMEOUT);
// Check snapshot.
- IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, snpDir.getAbsolutePath()).get(TIMEOUT);
+ IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, snpDir.getAbsolutePath()).get(TIMEOUT)
+ .idleVerifyResult();
StringBuilder sb = new StringBuilder();
res.print(sb::append, true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index 1d1cd6d7e19..f0d6141dca8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -34,9 +37,12 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
@@ -51,12 +57,12 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
/** */
private static final String INMEM_DATA_REGION = "inMemDr";
- /** */
- private IgniteSnapshotManager snpMgr;
-
/** */
private IgniteEx client;
+ /** Non-baseline.*/
+ private IgniteEx nonBaseline;
+
/** {@inheritDoc} */
@Override public void beforeTestSnapshot() throws Exception {
super.beforeTestSnapshot();
@@ -69,9 +75,15 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
grid(0).cluster().state(ACTIVE);
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion());
+
+ nonBaseline = startGrid(G.allGrids().size());
+
client = startClientGrid(G.allGrids().size());
- snpMgr = snp(grid(0));
+ grid(0).createCache(dfltCacheCfg);
}
/** {@inheritDoc} */
@@ -87,39 +99,120 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
inMemDr.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
cfg.getDataStorageConfiguration().setDataRegionConfigurations(inMemDr);
+ cfg.setCacheConfiguration(null);
+
return cfg;
}
/**
- * Tests snapshot warning when streamer is working during snapshot creation. Default receiver.
+ * Tests snapshot warning when streamer is working during snapshot creation. Default receiver. Handling from client.
+ */
+ @Test
+ public void testStreamerWhileSnapshotDefaultClient() throws Exception {
+ doTestDataStreamerWhileSnapshot(client, false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer is working during snapshot creation. Default receiver. Handling from
+ * not-coordinator node.
+ */
+ @Test
+ public void testStreamerWhileSnapshotDefaultNotCoordinator() throws Exception {
+ doTestDataStreamerWhileSnapshot(grid(1), false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer is working during snapshot creation. Default receiver. Handling from
+ * coordinator node.
*/
@Test
- public void testStreamerWhileSnapshotDefault() throws Exception {
- doTestDataStreamerWhileSnapshot(false);
+ public void testStreamerWhileSnapshotDefaultCoordinator() throws Exception {
+ doTestDataStreamerWhileSnapshot(grid(0), false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer is working during snapshot creation. Default receiver. Handling from
+ * non-baseline coordinator node.
+ */
+ @Test
+ public void testStreamerWhileSnapshotDefaultNotBaselineCoordinator() throws Exception {
+ grid(0).destroyCache(dfltCacheCfg.getName());
+
+ awaitPartitionMapExchange();
+
+ stopGrid(0);
+ stopGrid(1);
+ stopGrid(2);
+
+ startGrid(getTestIgniteInstanceName(0));
+ startGrid(getTestIgniteInstanceName(1));
+ startGrid(getTestIgniteInstanceName(2));
+
+ nonBaseline.createCache(dfltCacheCfg);
+
+ assert U.isLocalNodeCoordinator(nonBaseline.context().discovery());
+
+ doTestDataStreamerWhileSnapshot(nonBaseline, false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer is working during snapshot creation. Default receiver. Handling from
+ * non-baseline node.
+ */
+ @Test
+ public void testStreamerWhileSnapshotDefaultNotBaseline() throws Exception {
+ doTestDataStreamerWhileSnapshot(nonBaseline, false);
}
/**
* Tests snapshot warning when streamer is working during snapshot creation. Overwriting receiver.
+ * Handling from client.
*/
@Test
- public void testStreamerWhileSnapshotOverwriting() throws Exception {
- doTestDataStreamerWhileSnapshot(true);
+ public void testStreamerWhileSnapshotOverwritingClient() throws Exception {
+ doTestDataStreamerWhileSnapshot(client, true);
}
/**
- * Tests snapshot warning when streamer failed or canceled before snapshot. Default receiver.
+ * Tests snapshot warning when streamer failed or canceled before snapshot. Default receiver. Handling from client
+ * node.
*/
@Test
- public void testStreamerFailsLongAgoDefault() throws Exception {
- doTestDataStreamerFailedBeforeSnapshot(false);
+ public void testStreamerFailsLongAgoDefaultClient() throws Exception {
+ doTestDataStreamerFailedBeforeSnapshot(client, false);
}
/**
- * Tests snapshot warning when streamer failed or canceled before snapshot. Overwriting receiver.
+ * Tests snapshot warning when streamer failed or canceled before snapshot. Default receiver. Handling from
+ * coordinator node.
*/
@Test
- public void testStreamerFailsLongAgoOverwriting() throws Exception {
- doTestDataStreamerFailedBeforeSnapshot(true);
+ public void testStreamerFailsLongAgoDefaultCoordinator() throws Exception {
+ doTestDataStreamerFailedBeforeSnapshot(grid(0), false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer failed or canceled before snapshot. Overwriting receiver. Handling from
+ * client node.
+ */
+ @Test
+ public void testStreamerFailsLongAgoOverwritingClient() throws Exception {
+ doTestDataStreamerFailedBeforeSnapshot(client, true);
+ }
+
+ /**
+ * Tests snapshot warning is restored from non-holding warning meta node.
+ */
+ @Test
+ public void testMetaWarningRestoredByOnlyOneNode() throws Exception {
+ doTestDataStreamerWhileSnapshot(client, false);
+
+ // Check snapshot holding by only one node.
+ stopGrid(0);
+ stopGrid(1);
+
+ createAndCheckSnapshot(client, false, DataStreamerUpdatesHandler.WRN_MSG,
+ SnapshotPartitionsQuickVerifyHandler.WRN_MSG);
}
/**
@@ -141,7 +234,7 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
IgniteInternalFuture<?> loadFut = runLoad(grid(0), false, stopLoad);
try {
- assertThrows(null, () -> snpMgr.createSnapshot(SNAPSHOT_NAME).get(), IgniteException.class,
+ assertThrows(null, () -> snp(client).createSnapshot(SNAPSHOT_NAME).get(), IgniteException.class,
DataStreamerUpdatesHandler.WRN_MSG);
}
finally {
@@ -152,7 +245,7 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
grid(0).destroyCache(cname);
grid(0).destroyCache(dfltCacheCfg.getName());
- snpMgr.restoreSnapshot(SNAPSHOT_NAME, Collections.singletonList(cname)).get();
+ snp(grid(1)).restoreSnapshot(SNAPSHOT_NAME, Collections.singletonList(cname)).get();
for (int i = 0; i < 100; ++i)
assertEquals(i, grid(0).cache(cname).get(i));
@@ -162,10 +255,11 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
* Tests streaming into in-memory cache doesn't affect snapshot.
*/
@Test
- public void testStreamingIntoInMememoryDoesntAffectSnapshot() throws Exception {
+ public void testStreamingIntoInMemoryDoesntAffectSnapshot() throws Exception {
String cache2Name = "cache2";
int loadCnt = 1000;
+ dfltCacheCfg.setEncryptionEnabled(encryption);
grid(0).createCache(new CacheConfiguration<>(dfltCacheCfg).setName(cache2Name));
try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(cache2Name)) {
@@ -173,9 +267,9 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
ds.addData(i, i);
}
- grid(0).destroyCache(dfltCacheCfg.getName());
- dfltCacheCfg.setDataRegionName(INMEM_DATA_REGION);
+ grid(0).destroyCache(DEFAULT_CACHE_NAME);
dfltCacheCfg.setEncryptionEnabled(false);
+ dfltCacheCfg.setDataRegionName(INMEM_DATA_REGION);
grid(0).createCache(dfltCacheCfg);
AtomicBoolean stop = new AtomicBoolean();
@@ -183,7 +277,7 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
IgniteInternalFuture<?> loadFut = runLoad(client, false, stop);
try {
- snpMgr.createSnapshot(SNAPSHOT_NAME).get();
+ snp(client).createSnapshot(SNAPSHOT_NAME).get();
}
finally {
stop.set(true);
@@ -192,7 +286,7 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
grid(0).destroyCache(cache2Name);
- snpMgr.restoreSnapshot(SNAPSHOT_NAME, null).get();
+ snp(grid(1)).restoreSnapshot(SNAPSHOT_NAME, null).get();
for (int i = 0; i < loadCnt; ++i)
assertEquals(i, grid(0).cache(cache2Name).get(i));
@@ -203,28 +297,41 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
*
* @param allowOverwrite 'allowOverwrite' setting.
*/
- private void doTestDataStreamerWhileSnapshot(boolean allowOverwrite) throws Exception {
+ private void doTestDataStreamerWhileSnapshot(IgniteEx snpHnd, boolean allowOverwrite) throws Exception {
AtomicBoolean stopLoading = new AtomicBoolean();
- TestRecordingCommunicationSpi clientCm =
- (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+ TestRecordingCommunicationSpi cm = (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite, stopLoading);
- clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+ cm.blockMessages(DataStreamerRequest.class, grid(0).name());
- clientCm.waitForBlocked(batchesPerNode(grid(0)));
+ cm.waitForBlocked(batchesPerNode(grid(0)));
+
+ String expectedWrn = allowOverwrite ? null : DataStreamerUpdatesHandler.WRN_MSG;
+ String notExpWrn = allowOverwrite ? null : SnapshotPartitionsQuickVerifyHandler.WRN_MSG;
try {
- if (allowOverwrite)
- createAndCheckSnapshot(null, null);
- else {
- createAndCheckSnapshot(DataStreamerUpdatesHandler.WRN_MSG,
- SnapshotPartitionsQuickVerifyHandler.WRN_MSG);
+ SnapshotPartitionsVerifyTaskResult checkRes = createAndCheckSnapshot(snpHnd, true, expectedWrn,
+ notExpWrn);
+
+ if (expectedWrn != null) {
+ Map<String, SnapshotMetadata> metaByNodes = checkRes.metas().values().stream().flatMap(List::stream)
+ .distinct().collect(Collectors.toMap(SnapshotMetadata::consistentId, Function.identity()));
+
+ for (SnapshotMetadata m : metaByNodes.values()) {
+ // Check warnings are stored on coordinator only.
+ if (m.consistentId().equals(grid(0).cluster().localNode().consistentId().toString())) {
+ assertTrue(!F.isEmpty(m.warnings()) && m.warnings().size() == 1 &&
+ m.warnings().get(0).contains(expectedWrn));
+ }
+ else
+ assertTrue(F.isEmpty(m.warnings()));
+ }
}
}
finally {
- clientCm.stopBlock();
+ cm.stopBlock();
stopLoading.set(true);
loadFut.get();
}
@@ -233,20 +340,22 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
/**
* Tests snapshot warning when streamer failed or canceled before snapshot.
*
+ * @param snpHnd Snapshot handler node.
* @param allowOverwrite 'allowOverwrite' setting.
*/
- private void doTestDataStreamerFailedBeforeSnapshot(boolean allowOverwrite) throws Exception {
- TestRecordingCommunicationSpi clientCm =
- (TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+ private void doTestDataStreamerFailedBeforeSnapshot(IgniteEx snpHnd, boolean allowOverwrite) throws Exception {
+ IgniteEx newClient = startClientGrid();
+
+ UUID newClientId = newClient.localNode().id();
- UUID clientId = client.localNode().id();
+ TestRecordingCommunicationSpi cm = (TestRecordingCommunicationSpi)newClient.configuration().getCommunicationSpi();
CountDownLatch nodeGoneLatch = new CountDownLatch(1);
grid(0).events().localListen(e -> {
assert e instanceof DiscoveryEvent;
- if (((DiscoveryEvent)e).eventNode().id().equals(clientId))
+ if (((DiscoveryEvent)e).eventNode().id().equals(newClientId))
nodeGoneLatch.countDown();
return false;
@@ -254,13 +363,13 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
AtomicBoolean stopLoading = new AtomicBoolean();
- IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite, stopLoading);
+ IgniteInternalFuture<?> loadFut = runLoad(newClient, allowOverwrite, stopLoading);
- clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+ cm.blockMessages(DataStreamerRequest.class, grid(0).name());
- clientCm.waitForBlocked(batchesPerNode(grid(0)));
+ cm.waitForBlocked(batchesPerNode(grid(0)));
- runAsync(() -> stopGrid(client.name(), true));
+ runAsync(() -> stopGrid(newClient.name(), true));
nodeGoneLatch.await();
@@ -268,9 +377,9 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
loadFut.cancel();
if (allowOverwrite)
- createAndCheckSnapshot(null, null);
+ createAndCheckSnapshot(snpHnd, true, null, null);
else {
- createAndCheckSnapshot(SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
+ createAndCheckSnapshot(snpHnd, true, SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
DataStreamerUpdatesHandler.WRN_MSG);
}
}
@@ -306,27 +415,56 @@ public class IgniteClusterSnapshotStreamerTest extends AbstractSnapshotSelfTest
}
/** */
- private void createAndCheckSnapshot(String expWrn, String notexpWrn) throws IgniteCheckedException {
- assert notexpWrn == null || expWrn != null;
+ private SnapshotPartitionsVerifyTaskResult createAndCheckSnapshot(IgniteEx snpHnd, boolean create,
+ String expWrn, String notExpWrn) throws Exception {
+ assert notExpWrn == null || expWrn != null;
- if (expWrn == null)
- snpMgr.createSnapshot(SNAPSHOT_NAME, null).get();
- else {
- Throwable snpWrn = assertThrows(
- null,
- () -> snpMgr.createSnapshot(SNAPSHOT_NAME, null).get(),
- IgniteException.class,
- expWrn
- );
-
- if (notexpWrn != null)
- assertTrue(!snpWrn.getMessage().contains(notexpWrn));
+ if (create) {
+ if (expWrn == null)
+ snp(snpHnd).createSnapshot(SNAPSHOT_NAME, null).get();
+ else {
+ Throwable snpWrn = assertThrows(
+ null,
+ () -> snp(snpHnd).createSnapshot(SNAPSHOT_NAME, null).get(),
+ IgniteException.class,
+ expWrn
+ );
+
+ if (notExpWrn != null)
+ assertTrue(!snpWrn.getMessage().contains(notExpWrn));
+ }
}
- IdleVerifyResultV2 checkRes = snpMgr.checkSnapshot(SNAPSHOT_NAME, null).get();
+ SnapshotPartitionsVerifyTaskResult checkRes = snp(snpHnd).checkSnapshot(SNAPSHOT_NAME, null).get();
assertTrue(checkRes.exceptions().isEmpty());
- assertTrue((expWrn != null) == checkRes.hasConflicts());
+ assertTrue((expWrn != null) == checkRes.idleVerifyResult().hasConflicts());
+
+ if (expWrn != null) {
+ ListeningTestLogger testLog = new ListeningTestLogger();
+
+ LogListener lsnr = LogListener.matches(expWrn).times(1).build();
+
+ testLog.registerListener(lsnr);
+
+ checkRes.print(testLog::info);
+
+ lsnr.check();
+
+ if (notExpWrn != null) {
+ testLog = new ListeningTestLogger();
+
+ lsnr = LogListener.matches(notExpWrn).times(0).build();
+
+ testLog.registerListener(lsnr);
+
+ checkRes.print(testLog::info);
+
+ lsnr.check();
+ }
+ }
+
+ return checkRes;
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotConsistencyTest.java
index 75ca2969053..3c1dedb3b54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotConsistencyTest.java
@@ -203,7 +203,8 @@ public class IgniteSnapshotConsistencyTest extends GridCommonAbstractTest {
snpFut.get(getTestTimeout());
putFut.get(getTestTimeout());
- IdleVerifyResultV2 snpVerifyRes = crd.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 snpVerifyRes = crd.context().cache().context().snapshotMgr()
+ .checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
assertFalse(snpVerifyRes.hasConflicts());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
index 3c4d3ae3221..f92a05cb7b6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -195,8 +195,8 @@ public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRe
awaitPartitionMapExchange();
// Ensure that the snapshot check command succeeds.
- IdleVerifyResultV2 res =
- emptyNode.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+ IdleVerifyResultV2 res = emptyNode.context().cache().context().snapshotMgr()
+ .checkSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT).idleVerifyResult();
StringBuilder buf = new StringBuilder();
res.print(buf::append, true);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
index 54819c6bce5..81aee9f691a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
@@ -44,7 +44,8 @@ public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotS
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
- IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr()
+ .checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -61,7 +62,8 @@ public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotS
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
- IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr()
+ .checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);
@@ -87,7 +89,8 @@ public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotS
grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
- IdleVerifyResultV2 res = grid(0).context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME, null).get();
+ IdleVerifyResultV2 res = grid(0).context().cache().context().snapshotMgr()
+ .checkSnapshot(SNAPSHOT_NAME, null).get().idleVerifyResult();
StringBuilder b = new StringBuilder();
res.print(b::append, true);