You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2024/02/09 13:32:51 UTC

(ignite) branch master updated: IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)

This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ad3f0f4523 IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)
3ad3f0f4523 is described below

commit 3ad3f0f45239472745392ae4820f04b24527b5e1
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Fri Feb 9 16:32:42 2024 +0300

    IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)
---
 .../snapshot/dump/CreateDumpFutureTask.java        |  57 ++++++------
 .../snapshot/dump/IgniteCacheDumpSelf2Test.java    | 103 +++++++++++++++++++++
 2 files changed, 129 insertions(+), 31 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 3b6790fc64b..1267e18bf88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -137,6 +137,15 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
      */
     private final @Nullable ConcurrentMap<Long, ByteBuffer> encThLocBufs;
 
+    /**
+     * Cache entry version at the dump start.
+     * Regular updates with {@link IgniteCache#put(Object, Object)} and similar calls use version generated with
+     * {@link GridCacheVersionManager#next(GridCacheVersion)}. This version value monotonically grows. It is generated
+     * on a <b>primary</b> node and is <b>propagated</b> to the backups. So, on a primary we can distinguish updates
+     * that happen before and after the dump start.
+     */
+    private GridCacheVersion startVer;
+
     /**
      * @param cctx Cache context.
      * @param srcNodeId Node id which cause snapshot task creation.
@@ -211,6 +220,8 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
 
     /** Prepares all data structures to dump entries. */
     private void prepare() throws IOException, IgniteCheckedException {
+        startVer = cctx.versions().next(cctx.kernalContext().discovery().topologyVersion());
+
         for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
             int grp = e.getKey();
 
@@ -424,47 +435,28 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
      */
     private class PartitionDumpContext implements Closeable {
         /** Group id. */
-        final int grp;
+        private final int grp;
 
         /** Partition id. */
-        final int part;
+        private final int part;
 
         /**
          * Key is cache id, values is set of keys dumped via
          * {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}.
          */
-        final Map<Integer, Set<KeyCacheObject>> changed;
+        private final Map<Integer, Set<KeyCacheObject>> changed;
 
         /** Cache id for {@link #iterLastKey} */
-        int iterLastKeyCache;
+        private int iterLastKeyCache;
 
         /** Last key dumped via {@link #writeForIterator(int, long, KeyCacheObject, CacheObject, GridCacheVersion, CacheObjectContext)}. */
-        @Nullable KeyCacheObject iterLastKey;
+        private @Nullable KeyCacheObject iterLastKey;
 
         /** Count of entries changed during dump creation. */
-        LongAdder changedCnt = new LongAdder();
+        private final LongAdder changedCnt = new LongAdder();
 
         /** Partition dump file. Lazily initialized to prevent creation files for empty partitions. */
-        final FileIO file;
-
-        /**
-         * Regular updates with {@link IgniteCache#put(Object, Object)} and similar calls
-         * will use version generated with {@link GridCacheVersionManager#next(GridCacheVersion)}.
-         * Version is monotonically increase.
-         * Version generated on <b>primary</b> node and propagated to backups.
-         * So on primary we can distinguish updates that happens before and after dump start comparing versions
-         * with the version we read with {@link GridCacheVersionManager#last()}.
-         */
-        @Nullable final GridCacheVersion startVer;
-
-        /**
-         * Unlike regular update, {@link IgniteDataStreamer} updates receive the same version for all entries.
-         * See {@code IsolatedUpdater.receive}.
-         * Note, using {@link IgniteDataStreamer} during cache dump creation can lead to dump inconsistency.
-         *
-         * @see GridCacheVersionManager#isolatedStreamerVersion()
-         */
-        final GridCacheVersion isolatedStreamerVer;
+        private final FileIO file;
 
         /** Topology Version. */
         private final AffinityTopologyVersion topVer;
@@ -473,7 +465,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
         private final DumpEntrySerializer serializer;
 
         /** If {@code true} context is closed. */
-        volatile boolean closed;
+        private volatile boolean closed;
 
         /**
          * Count of writers. When count becomes {@code 0} context must be closed.
@@ -483,11 +475,14 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
          */
         private final AtomicInteger writers = new AtomicInteger(1);
 
+        /** Primary node/partition flag. */
+        private final boolean primary;
+
         /**
          * @param gctx Group context.
          * @param part Partition id.
          */
-        public PartitionDumpContext(CacheGroupContext gctx, int part) {
+        private PartitionDumpContext(CacheGroupContext gctx, int part) {
             assert gctx != null;
 
             try {
@@ -495,8 +490,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
                 grp = gctx.groupId();
                 topVer = gctx.topology().lastTopologyChangeVersion();
 
-                startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? gctx.shared().versions().last() : null;
-                isolatedStreamerVer = cctx.versions().isolatedStreamerVersion();
+                primary = grpPrimaries.get(gctx.groupId()).contains(part);
 
                 serializer = new DumpEntrySerializer(
                     thLocBufs,
@@ -658,9 +652,10 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
          *
          * @param ver Entry version.
          * @return {@code True} if {@code ver} appeared after dump started.
+         * @see GridCacheVersionManager#isolatedStreamerVersion()
          */
         private boolean afterStart(GridCacheVersion ver) {
-            return (startVer != null && ver.isGreater(startVer)) && !isolatedStreamerVer.equals(ver);
+            return primary && ver.isGreater(startVer);
         }
 
         /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
index 7acf0f30bf4..fd2c0e44b2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -41,13 +42,19 @@ import java.util.stream.IntStream;
 import java.util.zip.ZipInputStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpConsumer;
 import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.dump.DumpReader;
 import org.apache.ignite.dump.DumpReaderConfiguration;
@@ -64,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer;
@@ -123,6 +131,9 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
     /** */
     private LogListener lsnr;
 
+    /** */
+    private boolean persistence;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -135,6 +146,11 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
             cfg.setGridLogger(testLog);
         }
 
+        if (persistence) {
+            cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
+        }
+
         return cfg;
     }
 
@@ -142,11 +158,98 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
+    /** Checks a dump when it is created with the data streamer just after a restart. */
+    @Test
+    public void testDumpAfterRestartWithStreamer() throws Exception {
+        doTestDumpAfterRestart(true);
+    }
+
+    /** Checks a dump when it is created just after a restart. */
+    @Test
+    public void testDumpAfterRestart() throws Exception {
+        doTestDumpAfterRestart(false);
+    }
+
+    /** Doest dump test when it is created just after restart. */
+    private void doTestDumpAfterRestart(boolean useDataStreamer) throws Exception {
+        persistence = true;
+
+        int nodes = 2;
+
+        IgniteEx ign0 = startGrids(nodes);
+
+        ign0.cluster().state(ClusterState.ACTIVE);
+
+        ign0.createCache(defaultCacheConfiguration());
+
+        try (IgniteDataStreamer<Integer, String> ds = ign0.dataStreamer(DEFAULT_CACHE_NAME)) {
+            IgniteCache<Integer, String> cache = ign0.cache(DEFAULT_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; ++i) {
+                if (useDataStreamer)
+                    ds.addData(i, "" + i);
+                else
+                    cache.put(i, "" + i);
+            }
+        }
+
+        stopAllGrids(false);
+        IgniteEx ign1 = startGrids(nodes);
+        ign1.cluster().state(ClusterState.ACTIVE);
+
+        ign1.snapshot().createDump(DMP_NAME, Collections.singletonList(DEFAULT_CACHE_NAME)).get(getTestTimeout());
+
+        ign1.destroyCache(DEFAULT_CACHE_NAME);
+
+        new DumpReader(new DumpReaderConfiguration(dumpDirectory(ign1, DMP_NAME), new DumpConsumer() {
+            @Override public void start() {
+                // No-op.
+            }
+
+            @Override public void onMappings(Iterator<TypeMapping> mappings) {
+                // No-op.
+            }
+
+            @Override public void onTypes(Iterator<BinaryType> types) {
+                // No-op.
+            }
+
+            @Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
+                caches.forEachRemaining(cacheData -> ign1.createCache(cacheData.config()));
+            }
+
+            @Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) {
+                data.forEachRemaining(de ->
+                    ign1.cache(ign1.context().cache().cacheDescriptor(de.cacheId()).cacheName()).put(de.key(), de.value())
+                );
+            }
+
+            @Override public void stop() {
+                // No-op.
+            }
+        }), log).run();
+
+        IgniteCache<Integer, String> cache = ign1.cache(DEFAULT_CACHE_NAME);
+
+        assertNotNull(cache);
+        assertEquals(KEYS_CNT, cache.size());
+
+        for (int i = 0; i < KEYS_CNT; ++i)
+            assertEquals(i + "", cache.get(i));
+    }
+
     /** */
     @Test
     public void testSnapshotDirectoryCreatedLazily() throws Exception {