You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/09/23 13:43:04 UTC

[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
     new f13743d8be6 IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)
f13743d8be6 is described below

commit f13743d8be60e0023c04430ff2bf6cafbe3dd06a
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Fri Sep 23 16:42:57 2022 +0300

    IGNITE-17645 Write lastSnpSeg into SnapshotMetadata (#10247)
---
 .../systemview/walker/SnapshotViewWalker.java      |  4 +-
 .../snapshot/IgniteSnapshotManager.java            |  7 +-
 .../persistence/snapshot/SnapshotFutureTask.java   | 12 ++-
 .../snapshot/SnapshotFutureTaskResult.java         | 52 +++++++++++++
 .../persistence/snapshot/SnapshotMetadata.java     | 15 ++++
 .../ignite/spi/systemview/view/SnapshotView.java   | 35 ++++++---
 .../ignite/internal/metric/SystemViewSelfTest.java |  1 +
 .../IgniteClusterSnapshotWalRecordTest.java        | 91 +++++++++++++++++++---
 8 files changed, 188 insertions(+), 29 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
index bee5fd35b21..9a513f10699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/SnapshotViewWalker.java
@@ -33,6 +33,7 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot
         v.accept(1, "consistentId", String.class);
         v.accept(2, "baselineNodes", String.class);
         v.accept(3, "cacheGroups", String.class);
+        v.accept(4, "snapshotRecordSegment", Long.class);
     }
 
     /** {@inheritDoc} */
@@ -41,10 +42,11 @@ public class SnapshotViewWalker implements SystemViewRowAttributeWalker<Snapshot
         v.accept(1, "consistentId", String.class, row.consistentId());
         v.accept(2, "baselineNodes", String.class, row.baselineNodes());
         v.accept(3, "cacheGroups", String.class, row.cacheGroups());
+        v.accept(4, "snapshotRecordSegment", Long.class, row.snapshotRecordSegment());
     }
 
     /** {@inheritDoc} */
     @Override public int count() {
-        return 4;
+        return 5;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e19e5c38a75..e27d6161d53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -822,6 +822,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
                 snpDir.mkdirs();
 
+                SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result();
+
                 SnapshotMetadata meta = new SnapshotMetadata(req.requestId(),
                     req.snapshotName(),
                     cctx.localNode().consistentId().toString(),
@@ -829,7 +831,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                     cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
                     grpIds,
                     blts,
-                    (Set<GroupPartitionId>)fut.result(),
+                    res.parts(),
+                    res.snapshotPointer(),
                     cctx.gridConfig().getEncryptionSpi().masterKeyDigest()
                 );
 
@@ -2183,7 +2186,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         List<File> dirs = snapshotCacheDirectories(meta.snapshotName(), null, meta.folderName(), name -> true);
         Collection<String> cacheGrps = F.viewReadOnly(dirs, FilePageStoreManager::cacheGroupName);
 
-        return new SnapshotView(meta.snapshotName(), meta.consistentId(), F.concat(meta.baselineNodes(), ","), F.concat(cacheGrps, ","));
+        return new SnapshotView(meta, cacheGrps);
     }
 
     /** @return Snapshot handlers. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 579da37ee8e..aa5326aa9af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.marshaller.MappedName;
 import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
@@ -94,7 +95,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
  * If partitions for particular cache group are not provided that they will be collected and added
  * on checkpoint under the write-lock.
  */
-class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener {
+class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskResult> implements CheckpointListener {
     /** File page store manager for accessing cache group associated files. */
     private final FilePageStoreManager pageStore;
 
@@ -140,6 +141,9 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
     /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
     private volatile CompletableFuture<Void> closeFut;
 
+    /** Pointer to {@link ClusterSnapshotRecord}. */
+    private volatile @Nullable WALPointer snpPtr;
+
     /** Flag indicates that task already scheduled on checkpoint. */
     private final AtomicBoolean started = new AtomicBoolean();
 
@@ -209,7 +213,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Set<GroupPartitionId> res, @Nullable Throwable err) {
+    @Override public boolean onDone(@Nullable SnapshotFutureTaskResult res, @Nullable Throwable err) {
         for (PageStoreSerialWriter writer : partDeltaWriters.values())
             U.closeQuiet(writer);
 
@@ -349,7 +353,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
             // 1. Checkpoint holds write acquire lock and Snapshot holds PME. Then there are not any concurrent updates.
             // 2. This record is written before the related CheckpointRecord, and is flushed with CheckpointRecord or instead it.
             if (cctx.wal() != null) {
-                cctx.wal().log(new ClusterSnapshotRecord(snpName));
+                snpPtr = cctx.wal().log(new ClusterSnapshotRecord(snpName));
 
                 ctx.walFlush(true);
             }
@@ -614,7 +618,7 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toSet());
 
-            closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0),
+            closeFut = CompletableFuture.runAsync(() -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), err0),
                 cctx.kernalContext().pools().getSystemExecutorService());
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java
new file mode 100644
index 00000000000..0f2acac059e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTaskResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents result of {@link SnapshotFutureTask}.
+ */
+class SnapshotFutureTaskResult {
+    /** Partitions for which snapshot was created. */
+    private final Set<GroupPartitionId> parts;
+
+    /** Pointer to {@link ClusterSnapshotRecord} in WAL. */
+    private final @Nullable WALPointer snpPtr;
+
+    /** */
+    SnapshotFutureTaskResult(Set<GroupPartitionId> parts, @Nullable WALPointer snpPtr) {
+        this.parts = Collections.unmodifiableSet(parts);
+        this.snpPtr = snpPtr;
+    }
+
+    /** */
+    Set<GroupPartitionId> parts() {
+        return parts;
+    }
+
+    /** */
+    @Nullable WALPointer snapshotPointer() {
+        return snpPtr;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index 009c0b8849b..e1f8e4e2e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -29,7 +29,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,6 +70,9 @@ public class SnapshotMetadata implements Serializable {
     @GridToStringInclude
     private final Set<String> bltNodes;
 
+    /** WAL pointer to {@link ClusterSnapshotRecord} if exists. */
+    private final @Nullable WALPointer snpRecPtr;
+
     /**
      * Map of cache group partitions from which snapshot has been taken on the local node. This map can be empty
      * since for instance, due to the node filter there is no cache data on node.
@@ -86,6 +91,7 @@ public class SnapshotMetadata implements Serializable {
      * @param pageSize Page size of stored snapshot data.
      * @param grpIds The list of cache groups ids which were included into snapshot.
      * @param bltNodes The set of affected by snapshot baseline nodes.
+     * @param snpRecPtr WAL pointer to {@link ClusterSnapshotRecord} if exists.
      * @param masterKeyDigest Master key digest for encrypted caches.
      */
     public SnapshotMetadata(
@@ -97,6 +103,7 @@ public class SnapshotMetadata implements Serializable {
         List<Integer> grpIds,
         Set<String> bltNodes,
         Set<GroupPartitionId> pairs,
+        @Nullable WALPointer snpRecPtr,
         @Nullable byte[] masterKeyDigest
     ) {
         this.rqId = rqId;
@@ -106,6 +113,7 @@ public class SnapshotMetadata implements Serializable {
         this.pageSize = pageSize;
         this.grpIds = grpIds;
         this.bltNodes = bltNodes;
+        this.snpRecPtr = snpRecPtr;
         this.masterKeyDigest = masterKeyDigest;
 
         pairs.forEach(p ->
@@ -170,6 +178,13 @@ public class SnapshotMetadata implements Serializable {
         return Collections.unmodifiableMap(locParts);
     }
 
+    /**
+     * @return WAL pointer to {@link ClusterSnapshotRecord} if exists.
+     */
+    public @Nullable WALPointer snapshotRecordPointer() {
+        return snpRecPtr;
+    }
+
     /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
     private void writeObject(java.io.ObjectOutputStream s)
         throws java.io.IOException {
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
index 3b17ca28bdd..6519a76a796 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SnapshotView.java
@@ -17,7 +17,11 @@
 
 package org.apache.ignite.spi.systemview.view;
 
+import java.util.Collection;
 import org.apache.ignite.internal.managers.systemview.walker.Order;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Snapshot representation for a {@link SystemView}.
@@ -41,22 +45,23 @@ public class SnapshotView {
     /** Cache group names that were included in the snapshot. */
     private final String cacheGrps;
 
+    /** WAL segment that contains {@link ClusterSnapshotRecord} if exists. */
+    private final Long snpRecSeg;
+
     /**
-     * @param name Snapshot name.
-     * @param consistentId Node consistent ID.
-     * @param baselineNodes Baseline nodes affected by the snapshot.
+     * @param meta Snapshot metadata.
      * @param cacheGrps Cache group names that were included in the snapshot.
      */
     public SnapshotView(
-        String name,
-        String consistentId,
-        String baselineNodes,
-        String cacheGrps
+        SnapshotMetadata meta,
+        Collection<String> cacheGrps
     ) {
-        this.name = name;
-        this.consistentId = consistentId;
-        this.baselineNodes = baselineNodes;
-        this.cacheGrps = cacheGrps;
+        name = meta.snapshotName();
+        consistentId = meta.consistentId();
+        baselineNodes = F.concat(meta.baselineNodes(), ",");
+        snpRecSeg = meta.snapshotRecordPointer() == null ? null : meta.snapshotRecordPointer().index();
+
+        this.cacheGrps = F.concat(cacheGrps, ",");
     }
 
     /**
@@ -90,4 +95,12 @@ public class SnapshotView {
     public String cacheGroups() {
         return cacheGrps;
     }
+
+    /**
+     * @return WAL segment that contains {@link ClusterSnapshotRecord} if exists.
+     */
+    @Order(4)
+    public Long snapshotRecordSegment() {
+        return snpRecSeg;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
index d391435979f..1db5a941956 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
@@ -2101,6 +2101,7 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
 
             assertEquals(testSnap0, view.name());
             assertEquals(ignite.localNode().consistentId().toString(), view.consistentId());
+            assertNotNull(view.snapshotRecordSegment());
 
             Collection<?> constIds = F.nodeConsistentIds(ignite.cluster().nodes());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
index cbd70523f13..6db4f0ad006 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWalRecordTest.java
@@ -21,9 +21,12 @@ import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -42,9 +45,13 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.systemview.view.SnapshotView;
+import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW;
+
 /** */
 public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest {
     /**
@@ -58,23 +65,33 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest
         CountDownLatch loadStopLatch = new CountDownLatch(1);
 
         // Start changing data concurrently with performing the ClusterSnapshot operation.
-        IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
-            Random r = new Random();
+        IgniteInternalFuture<?> loadFut = null;
 
-            while (loadStopLatch.getCount() > 0) {
-                int key = r.nextInt(CACHE_KEYS_RANGE);
+        try {
+            loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+                Random r = new Random();
 
-                Account acc = new Account(r.nextInt(), r.nextInt());
+                while (loadStopLatch.getCount() > 0 && !Thread.interrupted()) {
+                    int key = r.nextInt(CACHE_KEYS_RANGE);
 
-                ign.cache(DEFAULT_CACHE_NAME).put(key, acc);
-            }
-        }, 5, "cache-loader-");
+                    Account acc = new Account(r.nextInt(), r.nextInt());
 
-        snp(ign).createSnapshot(SNAPSHOT_NAME).get();
+                    ign.cache(DEFAULT_CACHE_NAME).put(key, acc);
+                }
+            }, 5, "cache-loader-");
 
-        loadStopLatch.countDown();
+            snp(ign).createSnapshot(SNAPSHOT_NAME).get();
 
-        loadFut.get();
+            loadStopLatch.countDown();
+
+            loadFut.get();
+        }
+        catch (Throwable err) {
+            if (loadFut != null)
+                loadFut.cancel();
+
+            throw err;
+        }
 
         T2<Map<Integer, Account>, Map<Integer, Account>> data = parseWalCacheState(ign, SNAPSHOT_NAME);
 
@@ -93,6 +110,58 @@ public class IgniteClusterSnapshotWalRecordTest extends AbstractSnapshotSelfTest
         assertCacheKeys(snpIgn.cache(DEFAULT_CACHE_NAME), snpData);
     }
 
+    /** */
+    @Test
+    public void testClusterSnapshotRecordIsWrittenToSnapshotMetadata() throws Exception {
+        int nodes = 3;
+        int snapshots = 10;
+
+        startGridsWithCache(nodes, 1, key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        for (int i = 0; i < snapshots; i++) {
+            // Start changing data concurrently with performing the ClusterSnapshot operation.
+            snp(grid(0)).createSnapshot(SNAPSHOT_NAME + i).get();
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            IgniteEx ign = grid(i);
+
+            ign.context().cache().context().wal().flush(null, true);
+
+            WALIterator walIt = wal(grid(i));
+
+            long snpCnt = 0;
+
+            SystemView<SnapshotView> snpView = ign.context().systemView().view(SNAPSHOT_SYS_VIEW);
+
+            for (IgniteBiTuple<WALPointer, WALRecord> tuple: walIt) {
+                WALRecord rec = tuple.getValue();
+
+                if (rec.type() == WALRecord.RecordType.CLUSTER_SNAPSHOT) {
+                    SnapshotMetadata metadata = snp(grid(i)).readSnapshotMetadata(
+                        snp(grid(i)).snapshotLocalDir(SNAPSHOT_NAME + snpCnt),
+                        (String)grid(i).configuration().getConsistentId());
+
+                    assertEquals(tuple.getKey(), metadata.snapshotRecordPointer());
+
+                    List<SnapshotView> snpNodesView = StreamSupport.stream(snpView.spliterator(), false)
+                        .filter(v -> v.name().equals(metadata.snapshotName()))
+                        .filter(v -> v.consistentId().equals(ign.localNode().consistentId()))
+                        .filter(v -> v.snapshotRecordSegment().equals(tuple.getKey().index()))
+                        .collect(Collectors.toList());
+
+                    assertEquals(1, snpNodesView.size());
+
+                    snpCnt++;
+                }
+            }
+
+            assertEquals(snapshots, snpCnt);
+        }
+    }
+
     /**
      * Parsing WAL files and dumping cache states: fisrst is before {@link ClusterSnapshotRecord} was written, and second
      * is after all load operations stopped.