You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/09/23 13:43:04 UTC
[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
new f13743d8be6 IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)
f13743d8be6 is described below
commit f13743d8be60e0023c04430ff2bf6cafbe3dd06a
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Fri Sep 23 16:42:57 2022 +0300
IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)
---
.../systemview/walker/SnapshotViewWalker.java | 4 +-
.../snapshot/IgniteSnapshotManager.java | 7 +-
.../persistence/snapshot/SnapshotFutureTask.java | 12 ++-
.../snapshot/SnapshotFutureTaskResult.java | 52 +++++++++++++
.../persistence/snapshot/SnapshotMetadata.java | 15 ++++
.../ignite/spi/systemview/view/SnapshotView.java | 35 ++++++---
.../ignite/internal/metric/SystemViewSelfTest.java | 1 +
.../IgniteClusterSnapshotWalRecordTest.java | 91 +++++++++++++++++++---
8 files changed, 188 insertions(+), 29 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
index bee5fd35b21..9a513f10699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
@@ -33,6 +33,7 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot
v.accept(1, "consistentId", String.class);
v.accept(2, "baselineNodes", String.class);
v.accept(3, "cacheGroups", String.class);
+ v.accept(4, "snapshotRecordSegment", Long.class);
}
/** {@inheritDoc} */
@@ -41,10 +42,11 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot
v.accept(1, "consistentId", String.class, row.consistentId());
v.accept(2, "baselineNodes", String.class, row.baselineNodes());
v.accept(3, "cacheGroups", String.class, row.cacheGroups());
+ v.accept(4, "snapshotRecordSegment", Long.class, row.snapshotRecordSegment());
}
/** {@inheritDoc} */
@Override public int count() {
- return 4;
+ return 5;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e19e5c38a75..e27d6161d53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -822,6 +822,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
snpDir.mkdirs();
+ SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result();
+
SnapshotMetadata meta = new SnapshotMetadata(req.requestId(),
req.snapshotName(),
cctx.localNode().consistentId().toString(),
@@ -829,7 +831,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
blts,
- (Set<GroupPartitionId>)fut.result(),
+ res.parts(),
+ res.snapshotPointer(),
cctx.gridConfig().getEncryptionSpi().masterKeyDigest()
);
@@ -2183,7 +2186,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
List<File> dirs = snapshotCacheDirectories(meta.snapshotName(), null, meta.folderName(), name -> true);
Collection<String> cacheGrps = F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName);
- return new SnapshotView(meta.snapshotName(), meta.consistentId(), F.concat(meta.baselineNodes(), ","), F.concat(cacheGrps, ","));
+ return new SnapshotView(meta, cacheGrps);
}
/** @return Snapshot handlers. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 579da37ee8e..aa5326aa9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
@@ -94,7 +95,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
* If partitions for particular cache group are not provided that they will be collected and added
* on checkpoint under the write-lock.
*/
-class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener {
+class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> implements CheckpointListener {
/** File page store manager for accessing cache group associated files. */
private final FilePageStoreManager pageStore;
@@ -140,6 +141,9 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
/** Future which will be completed when task requested to be closed. Will be executed on system pool. */
private volatile CompletableFuture<Void> closeFut;
+ /** Pointer to {@link ClusterSnapshotRecord}. */
+ private volatile @Nullable WALPointer snpPtr;
+
/** Flag indicates that task already scheduled on checkpoint. */
private final AtomicBoolean started = new AtomicBoolean();
@@ -209,7 +213,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
}
/** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Set<GroupPartitionId> res, @Nullable Throwable err) {
+ @Override public boolean onDone(@Nullable SnapshotFutureTaskResult res, @Nullable Throwable err) {
for (PageStoreSerialWriter writer : partDeltaWriters.values())
U.closeQuiet(writer);
@@ -349,7 +353,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
// 1. Checkpoint holds write acquire lock and Snapshot holds PME. Then there are not any concurrent updates.
// 2. This record is written before the related CheckpointRecord, and is flushed with CheckpointRecord or instead it.
if (cctx.wal() != null) {
- cctx.wal().log(new ClusterSnapshotRecord(snpName));
+ snpPtr = cctx.wal().log(new ClusterSnapshotRecord(snpName));
ctx.walFlush(true);
}
@@ -614,7 +618,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
- closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0),
+ closeFut = CompletableFuture.runAsync(() -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), err0),
cctx.kernalContext().pools().getSystemExecutorService());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java
new file mode 100644
index 00000000000..0f2acac059e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents result of {@link SnapshotFutureTask}.
+ */
+class SnapshotFutureTaskResult {
+ /** Partitions for which snapshot was created. */
+ private final Set<GroupPartitionId> parts;
+
+ /** Pointer to {@link ClusterSnapshotRecord} in WAL. */
+ private final @Nullable WALPointer snpPtr;
+
+ /** */
+ SnapshotFutureTaskResult(Set<GroupPartitionId> parts, @Nullable WALPointer snpPtr) {
+ this.parts = Collections.unmodifiableSet(parts);
+ this.snpPtr = snpPtr;
+ }
+
+ /** */
+ Set<GroupPartitionId> parts() {
+ return parts;
+ }
+
+ /** */
+ @Nullable WALPointer snapshotPointer() {
+ return snpPtr;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index 009c0b8849b..e1f8e4e2e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -29,7 +29,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,6 +70,9 @@ public class SnapshotMetadata implements Serializable {
@GridToStringInclude
private final Set<String> bltNodes;
+ /** WAL pointer to {@link ClusterSnapshotRecord} if exists. */
+ private final @Nullable WALPointer snpRecPtr;
+
/**
* Map of cache group partitions from which snapshot has been taken on the local node. This map can be empty
* since for instance, due to the node filter there is no cache data on node.
@@ -86,6 +91,7 @@ public class SnapshotMetadata implements Serializable {
* @param pageSize Page size of stored snapshot data.
* @param grpIds The list of cache groups ids which were included into snapshot.
* @param bltNodes The set of affected by snapshot baseline nodes.
+ * @param snpRecPtr WAL pointer to {@link ClusterSnapshotRecord} if exists.
* @param masterKeyDigest Master key digest for encrypted caches.
*/
public SnapshotMetadata(
@@ -97,6 +103,7 @@ public class SnapshotMetadata implements Serializable {
List<Integer> grpIds,
Set<String> bltNodes,
Set<GroupPartitionId> pairs,
+ @Nullable WALPointer snpRecPtr,
@Nullable byte[] masterKeyDigest
) {
this.rqId = rqId;
@@ -106,6 +113,7 @@ public class SnapshotMetadata implements Serializable {
this.pageSize = pageSize;
this.grpIds = grpIds;
this.bltNodes = bltNodes;
+ this.snpRecPtr = snpRecPtr;
this.masterKeyDigest = masterKeyDigest;
pairs.forEach(p ->
@@ -170,6 +178,13 @@ public class SnapshotMetadata implements Serializable {
return Collections.unmodifiableMap(locParts);
}
+ /**
+ * @return WAL pointer to {@link ClusterSnapshotRecord} if exists.
+ */
+ public @Nullable WALPointer snapshotRecordPointer() {
+ return snpRecPtr;
+ }
+
/** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
index 3b17ca28bdd..6519a76a796 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
@@ -17,7 +17,11 @@
package org.apache.ignite.spi.systemview.view;
+import java.util.Collection;
import org.apache.ignite.internal.managers.systemview.walker.Order;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import org.apache.ignite.internal.util.typedef.F;
/**
* Snapshot representation for a {@link SystemView}.
@@ -41,22 +45,23 @@ public class SnapshotView {
/** Cache group names that were included in the snapshot. */
private final String cacheGrps;
+ /** WAL segment that contains {@link ClusterSnapshotRecord} if exists. */
+ private final Long snpRecSeg;
+
/**
- * @param name Snapshot name.
- * @param consistentId Node consistent ID.
- * @param baselineNodes Baseline nodes affected by the snapshot.
+ * @param meta Snapshot metadata.
* @param cacheGrps Cache group names that were included in the snapshot.
*/
public SnapshotView(
- String name,
- String consistentId,
- String baselineNodes,
- String cacheGrps
+ SnapshotMetadata meta,
+ Collection<String> cacheGrps
) {
- this.name = name;
- this.consistentId = consistentId;
- this.baselineNodes = baselineNodes;
- this.cacheGrps = cacheGrps;
+ name = meta.snapshotName();
+ consistentId = meta.consistentId();
+ baselineNodes = F.concat(meta.baselineNodes(), ",");
+ snpRecSeg = meta.snapshotRecordPointer() == null ? null : meta.snapshotRecordPointer().index();
+
+ this.cacheGrps = F.concat(cacheGrps, ",");
}
/**
@@ -90,4 +95,12 @@ public class SnapshotView {
public String cacheGroups() {
return cacheGrps;
}
+
+ /**
+ * @return WAL segment that contains {@link ClusterSnapshotRecord} if exists.
+ */
+ @Order(4)
+ public Long snapshotRecordSegment() {
+ return snpRecSeg;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
index d391435979f..1db5a941956 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
@@ -2101,6 +2101,7 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
assertEquals(testSnap0, view.name());
assertEquals(ignite.localNode().consistentId().toString(), view.consistentId());
+ assertNotNull(view.snapshotRecordSegment());
Collection<?> constIds = F.nodeConsistentIds(ignite.cluster().nodes());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
index cbd70523f13..6db4f0ad006 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
@@ -21,9 +21,12 @@ import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -42,9 +45,13 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.systemview.view.SnapshotView;
+import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW;
+
/** */
public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest {
/**
@@ -58,23 +65,33 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest
CountDownLatch loadStopLatch = new CountDownLatch(1);
// Start changing data concurrently with performing the ClusterSnapshot operation.
- IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
- Random r = new Random();
+ IgniteInternalFuture<?> loadFut = null;
- while (loadStopLatch.getCount() > 0) {
- int key = r.nextInt(CACHE_KEYS_RANGE);
+ try {
+ loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ Random r = new Random();
- Account acc = new Account(r.nextInt(), r.nextInt());
+ while (loadStopLatch.getCount() > 0 && !Thread.interrupted()) {
+ int key = r.nextInt(CACHE_KEYS_RANGE);
- ign.cache(DEFAULT_CACHE_NAME).put(key, acc);
- }
- }, 5, "cache-loader-");
+ Account acc = new Account(r.nextInt(), r.nextInt());
- snp(ign).createSnapshot(SNAPSHOT_NAME).get();
+ ign.cache(DEFAULT_CACHE_NAME).put(key, acc);
+ }
+ }, 5, "cache-loader-");
- loadStopLatch.countDown();
+ snp(ign).createSnapshot(SNAPSHOT_NAME).get();
- loadFut.get();
+ loadStopLatch.countDown();
+
+ loadFut.get();
+ }
+ catch (Throwable err) {
+ if (loadFut != null)
+ loadFut.cancel();
+
+ throw err;
+ }
T2<Map<Integer, Account>, Map<Integer, Account>> data = parseWalCacheState(ign, SNAPSHOT_NAME);
@@ -93,6 +110,58 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest
assertCacheKeys(snpIgn.cache(DEFAULT_CACHE_NAME), snpData);
}
+ /** */
+ @Test
+ public void testClusterSnapshotRecordIsWrittenToSnapshotMetadata() throws Exception {
+ int nodes = 3;
+ int snapshots = 10;
+
+ startGridsWithCache(nodes, 1, key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ for (int i = 0; i < snapshots; i++) {
+ // Start changing data concurrently with performing the ClusterSnapshot operation.
+ snp(grid(0)).createSnapshot(SNAPSHOT_NAME + i).get();
+ }
+
+ for (int i = 0; i < nodes; i++) {
+ IgniteEx ign = grid(i);
+
+ ign.context().cache().context().wal().flush(null, true);
+
+ WALIterator walIt = wal(grid(i));
+
+ long snpCnt = 0;
+
+ SystemView<SnapshotView> snpView = ign.context().systemView().view(SNAPSHOT_SYS_VIEW);
+
+ for (IgniteBiTuple<WALPointer, WALRecord> tuple: walIt) {
+ WALRecord rec = tuple.getValue();
+
+ if (rec.type() == WALRecord.RecordType.CLUSTER_SNAPSHOT) {
+ SnapshotMetadata metadata = snp(grid(i)).readSnapshotMetadata(
+ snp(grid(i)).snapshotLocalDir(SNAPSHOT_NAME + snpCnt),
+ (String)grid(i).configuration().getConsistentId());
+
+ assertEquals(tuple.getKey(), metadata.snapshotRecordPointer());
+
+ List<SnapshotView> snpNodesView = StreamSupport.stream(snpView.spliterator(), false)
+ .filter(v -> v.name().equals(metadata.snapshotName()))
+ .filter(v -> v.consistentId().equals(ign.localNode().consistentId()))
+ .filter(v -> v.snapshotRecordSegment().equals(tuple.getKey().index()))
+ .collect(Collectors.toList());
+
+ assertEquals(1, snpNodesView.size());
+
+ snpCnt++;
+ }
+ }
+
+ assertEquals(snapshots, snpCnt);
+ }
+ }
+
/**
* Parsing WAL files and dumping cache states: fisrst is before {@link ClusterSnapshotRecord} was written, and second
* is after all load operations stopped.