You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/08/04 09:58:49 UTC
[ignite] branch master updated: IGNITE-15062 Events for snapshot
restore operation. (#9240)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5d8d637 IGNITE-15062 Events for snapshot restore operation. (#9240)
5d8d637 is described below
commit 5d8d63768e5d856b503fff5f80f9d65df678557f
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Wed Aug 4 12:58:32 2021 +0300
IGNITE-15062 Events for snapshot restore operation. (#9240)
---
.../java/org/apache/ignite/events/EventType.java | 47 +++++++++++++++++++-
.../snapshot/IgniteSnapshotManager.java | 2 +-
.../snapshot/SnapshotRestoreProcess.java | 51 +++++++++++++++++++---
.../snapshot/AbstractSnapshotSelfTest.java | 5 ++-
.../IgniteClusterSnapshotRestoreSelfTest.java | 33 ++++++++++++++
.../snapshot/IgniteClusterSnapshotSelfTest.java | 14 +++---
...niteClusterSnapshotRestoreWithIndexingTest.java | 24 ++++++++++
7 files changed, 159 insertions(+), 17 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 3971ed0..f82c391 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -969,6 +969,48 @@ public interface EventType {
public static final int EVT_NODE_VALIDATION_FAILED = 170;
/**
+ * Built-in event type: Cluster snapshot restore has been started event.
+ *
+ * <p>
+ * Fired on the initiator node when a snapshot restore operation is started.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteSnapshot#restoreSnapshot(String, Collection)
+ * @see IgniteSnapshot#cancelSnapshotRestore(String)
+ */
+ public static final int EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED = 171;
+
+ /**
+ * Built-in event type: Cluster snapshot restore has been finished event.
+ *
+ * <p>
+ * Fired on the initiator node when the snapshot restore operation has completed on all nodes.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteSnapshot#restoreSnapshot(String, Collection)
+ * @see IgniteSnapshot#cancelSnapshotRestore(String)
+ */
+ public static final int EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED = 172;
+
+ /**
+ * Built-in event type: Cluster snapshot restore has been failed event.
+ *
+ * <p>
+ * Fired on the initiator node when the snapshot restore operation failed.
+ * </p>
+ * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+ * internal Ignite events and should not be used by user-defined events.
+ *
+ * @see IgniteSnapshot#restoreSnapshot(String, Collection)
+ * @see IgniteSnapshot#cancelSnapshotRestore(String)
+ */
+ public static final int EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED = 173;
+
+ /**
* All cluster snapshot events. This array can be directly passed into
* {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
* subscribe to all cluster snapshot events.
@@ -978,7 +1020,10 @@ public interface EventType {
public static final int[] EVTS_CLUSTER_SNAPSHOT = {
EVT_CLUSTER_SNAPSHOT_STARTED,
EVT_CLUSTER_SNAPSHOT_FINISHED,
- EVT_CLUSTER_SNAPSHOT_FAILED
+ EVT_CLUSTER_SNAPSHOT_FAILED,
+ EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED,
+ EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED,
+ EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED
};
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index f9ea6a5..45ec8d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1550,7 +1550,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param msg Event message.
* @param type Snapshot event type.
*/
- private void recordSnapshotEvent(String snpName, String msg, int type) {
+ void recordSnapshotEvent(String snpName, String msg, int type) {
if (!cctx.gridEvents().isRecordable(type) || !cctx.gridEvents().hasListener(type))
return;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 5061532..49c9c76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -42,6 +42,7 @@ import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -86,6 +87,12 @@ public class SnapshotRestoreProcess {
/** Reject operation message. */
private static final String OP_REJECT_MSG = "Cache group restore operation was rejected. ";
+ /** Snapshot restore operation finish message. */
+ private static final String OP_FINISHED_MSG = "Cache groups have been successfully restored from the snapshot";
+
+ /** Snapshot restore operation failed message. */
+ private static final String OP_FAILED_MSG = "Failed to restore snapshot cache groups";
+
/** Kernal context. */
private final GridKernalContext ctx;
@@ -151,6 +158,7 @@ public class SnapshotRestoreProcess {
* @return Future that will be completed when the restore operation is complete and the cache groups are started.
*/
public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cacheGrpNames) {
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
ClusterSnapshotFuture fut0;
try {
@@ -168,7 +176,7 @@ public class SnapshotRestoreProcess {
if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), SNAPSHOT_RESTORE_CACHE_GROUP))
throw new IgniteException(OP_REJECT_MSG + "Not all nodes in the cluster support restore operation.");
- if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+ if (snpMgr.isSnapshotCreating())
throw new IgniteException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
synchronized (this) {
@@ -181,10 +189,41 @@ public class SnapshotRestoreProcess {
}
}
catch (IgniteException e) {
+ snpMgr.recordSnapshotEvent(
+ snpName,
+ OP_FAILED_MSG + ": " + e.getMessage(),
+ EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED
+ );
+
return new IgniteFinishedFutureImpl<>(e);
}
- ctx.cache().context().snapshotMgr().checkSnapshot(snpName, cacheGrpNames).listen(f -> {
+ fut0.listen(f -> {
+ if (f.error() != null) {
+ snpMgr.recordSnapshotEvent(
+ snpName,
+ OP_FAILED_MSG + ": " + f.error().getMessage() + " [reqId=" + fut0.rqId + "].",
+ EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED
+ );
+ }
+ else {
+ snpMgr.recordSnapshotEvent(
+ snpName,
+ OP_FINISHED_MSG + " [reqId=" + fut0.rqId + "].",
+ EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED
+ );
+ }
+ });
+
+ String msg = "Cluster-wide snapshot restore operation started [reqId=" + fut0.rqId + ", snpName=" + snpName +
+ (cacheGrpNames == null ? "" : ", grps=" + cacheGrpNames) + ']';
+
+ if (log.isInfoEnabled())
+ log.info(msg);
+
+ snpMgr.recordSnapshotEvent(snpName, msg, EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED);
+
+ snpMgr.checkSnapshot(snpName, cacheGrpNames).listen(f -> {
if (f.error() != null) {
finishProcess(fut0.rqId, f.error());
@@ -263,8 +302,8 @@ public class SnapshotRestoreProcess {
return;
}
- SnapshotOperationRequest req = new SnapshotOperationRequest(
- fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+ SnapshotOperationRequest req =
+ new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
prepareRestoreProc.start(req.requestId(), req);
});
@@ -358,9 +397,9 @@ public class SnapshotRestoreProcess {
*/
private void finishProcess(UUID reqId, @Nullable Throwable err) {
if (err != null)
- log.error("Failed to restore snapshot cache group [reqId=" + reqId + ']', err);
+ log.error(OP_FAILED_MSG + " [reqId=" + reqId + "].", err);
else if (log.isInfoEnabled())
- log.info("Successfully restored cache group(s) from the snapshot [reqId=" + reqId + ']');
+ log.info(OP_FINISHED_MSG + " [reqId=" + reqId + "].");
SnapshotRestoreContext opCtx0 = opCtx;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index c6a3055..0175513 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -174,8 +175,8 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
* @param evts Events to check.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- protected void waitForEvents(List<Integer> evts) throws IgniteInterruptedCheckedException {
- boolean caught = waitForCondition(() -> locEvts.containsAll(evts), 10_000);
+ protected void waitForEvents(Integer... evts) throws IgniteInterruptedCheckedException {
+ boolean caught = waitForCondition(() -> locEvts.containsAll(Arrays.asList(evts)), TIMEOUT);
assertTrue("Events must be caught [locEvts=" + locEvts + ']', caught);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index c4d6a7e..c59b797 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -69,6 +69,9 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
@@ -153,6 +156,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
}
/** @throws Exception If failed. */
@@ -253,6 +258,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, null);
ensureCacheAbsent(dfltCacheCfg);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
}
/**
@@ -330,6 +337,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
GridTestUtils.assertThrowsAnyCause(
log, () -> fut.get(TIMEOUT), IgniteException.class, "The cluster should be active");
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
}
/** @throws Exception If failed. */
@@ -347,6 +356,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
ensureCacheAbsent(dfltCacheCfg);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
}
/** @throws Exception If failed. */
@@ -368,6 +379,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
awaitPartitionMapExchange();
+ locEvts.clear();
+
IgniteSnapshot snp = ignite.snapshot();
GridTestUtils.assertThrowsAnyCause(
@@ -377,6 +390,11 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
"Cache group(s) was not found in the snapshot"
);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+ assertEquals(2, locEvts.size());
+
+ locEvts.clear();
+
ignite.cache(CACHE2).destroy();
awaitPartitionMapExchange();
@@ -385,6 +403,9 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+ assertEquals(2, locEvts.size());
}
/** @throws Exception If failed. */
@@ -537,6 +558,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
ensureCacheAbsent(dfltCacheCfg);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+
GridTestUtils.assertThrowsAnyCause(
log,
() -> startGrid(3),
@@ -601,6 +624,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
files = node2dbDir.listFiles(file -> file.getName().startsWith(TMP_CACHE_DIR_PREFIX));
assertEquals("A temp directory should be removed at node startup", 0, files.length);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
}
/** @throws Exception If failed. */
@@ -683,6 +708,8 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(nodesCnt - 1));
+ locEvts.clear();
+
IgniteFuture<Void> fut = waitForBlockOnRestore(spi, procType, DEFAULT_CACHE_NAME);
ignite.cluster().state(state);
@@ -696,11 +723,17 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
return;
}
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), exCls, expMsg);
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+
+ assertEquals(2, locEvts.size());
+
ignite.cluster().state(ClusterState.ACTIVE);
ensureCacheAbsent(dfltCacheCfg);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index c0946b4..64b1e16 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -212,7 +212,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
fut.get();
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED);
}
finally {
loadFut.cancel();
@@ -554,7 +554,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED);
}
/** @throws Exception If fails. */
@@ -589,7 +589,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
IgniteCheckedException.class,
"Execution of local snapshot tasks fails");
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED);
assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + dirNameIgnite0,
!searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite0).isPresent());
@@ -691,7 +691,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
fut.get();
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED);
stopAllGrids();
@@ -866,7 +866,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED);
stopAllGrids();
@@ -1071,7 +1071,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
clnt.snapshot().createSnapshot(SNAPSHOT_NAME).get();
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FINISHED);
stopAllGrids();
@@ -1131,7 +1131,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
doSnapshotCancellationTest(startCli, Collections.singletonList(srv), srv.cache(dfltCacheCfg.getName()),
snpName -> killCli.snapshot().cancelSnapshot(snpName).get());
- waitForEvents(Arrays.asList(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED));
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_STARTED, EVT_CLUSTER_SNAPSHOT_FAILED);
}
/** @throws Exception If fails. */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
index 17f495c..11f5f8c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -29,13 +31,19 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.SnapshotEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+
/**
* Cluster snapshot restore tests verifying SQL and indexing.
*/
@@ -74,6 +82,8 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
}
/** @throws Exception If failed. */
@@ -94,6 +104,8 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
for (Ignite grid : G.allGrids())
assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
}
/** @throws Exception If failed. */
@@ -109,6 +121,10 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
IgniteEx ignite = startGrid(nodesCnt - 1);
+ List<SnapshotEvent> evts = new CopyOnWriteArrayList<>();
+
+ ignite.events().localListen(e -> e instanceof SnapshotEvent && evts.add((SnapshotEvent)e), EVTS_CLUSTER_SNAPSHOT);
+
resetBaselineTopology();
awaitPartitionMapExchange();
@@ -131,6 +147,14 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
awaitPartitionMapExchange();
assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+
+ GridTestUtils.waitForCondition(() -> evts.size() == 2, TIMEOUT);
+ assertEquals(2, evts.size());
+
+ SnapshotEvent startEvt = evts.get(0);
+
+ assertEquals(SNAPSHOT_NAME, startEvt.snapshotName());
+ assertTrue(startEvt.message().contains("grps=[" + DEFAULT_CACHE_NAME + ']'));
}
/** {@inheritDoc} */