You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2024/02/09 13:32:51 UTC
(ignite) branch master updated: IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)
This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3ad3f0f4523 IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)
3ad3f0f4523 is described below
commit 3ad3f0f45239472745392ae4820f04b24527b5e1
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Fri Feb 9 16:32:42 2024 +0300
IGNITE-21236 Fix dump doesn't contain data inserted before node restart (#11233)
---
.../snapshot/dump/CreateDumpFutureTask.java | 57 ++++++------
.../snapshot/dump/IgniteCacheDumpSelf2Test.java | 103 +++++++++++++++++++++
2 files changed, 129 insertions(+), 31 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 3b6790fc64b..1267e18bf88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -137,6 +137,15 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
*/
private final @Nullable ConcurrentMap<Long, ByteBuffer> encThLocBufs;
+ /**
+ * Cache entry version at the dump start.
+ * Regular updates with {@link IgniteCache#put(Object, Object)} and similar calls use version generated with
+ * {@link GridCacheVersionManager#next(GridCacheVersion)}. This version value monotonically grows. It is generated
+ * on a <b>primary</b> node and is <b>propagated</b> to the backups. So, on a primary we can distinguish updates
+ * that happen before and after the dump start.
+ */
+ private GridCacheVersion startVer;
+
/**
* @param cctx Cache context.
* @param srcNodeId Node id which cause snapshot task creation.
@@ -211,6 +220,8 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
/** Prepares all data structures to dump entries. */
private void prepare() throws IOException, IgniteCheckedException {
+ startVer = cctx.versions().next(cctx.kernalContext().discovery().topologyVersion());
+
for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
int grp = e.getKey();
@@ -424,47 +435,28 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
*/
private class PartitionDumpContext implements Closeable {
/** Group id. */
- final int grp;
+ private final int grp;
/** Partition id. */
- final int part;
+ private final int part;
/**
* Key is cache id, values is set of keys dumped via
* {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}.
*/
- final Map<Integer, Set<KeyCacheObject>> changed;
+ private final Map<Integer, Set<KeyCacheObject>> changed;
/** Cache id for {@link #iterLastKey} */
- int iterLastKeyCache;
+ private int iterLastKeyCache;
/** Last key dumped via {@link #writeForIterator(int, long, KeyCacheObject, CacheObject, GridCacheVersion, CacheObjectContext)}. */
- @Nullable KeyCacheObject iterLastKey;
+ private @Nullable KeyCacheObject iterLastKey;
/** Count of entries changed during dump creation. */
- LongAdder changedCnt = new LongAdder();
+ private final LongAdder changedCnt = new LongAdder();
/** Partition dump file. Lazily initialized to prevent creation files for empty partitions. */
- final FileIO file;
-
- /**
- * Regular updates with {@link IgniteCache#put(Object, Object)} and similar calls
- * will use version generated with {@link GridCacheVersionManager#next(GridCacheVersion)}.
- * Version is monotonically increase.
- * Version generated on <b>primary</b> node and propagated to backups.
- * So on primary we can distinguish updates that happens before and after dump start comparing versions
- * with the version we read with {@link GridCacheVersionManager#last()}.
- */
- @Nullable final GridCacheVersion startVer;
-
- /**
- * Unlike regular update, {@link IgniteDataStreamer} updates receive the same version for all entries.
- * See {@code IsolatedUpdater.receive}.
- * Note, using {@link IgniteDataStreamer} during cache dump creation can lead to dump inconsistency.
- *
- * @see GridCacheVersionManager#isolatedStreamerVersion()
- */
- final GridCacheVersion isolatedStreamerVer;
+ private final FileIO file;
/** Topology Version. */
private final AffinityTopologyVersion topVer;
@@ -473,7 +465,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
private final DumpEntrySerializer serializer;
/** If {@code true} context is closed. */
- volatile boolean closed;
+ private volatile boolean closed;
/**
* Count of writers. When count becomes {@code 0} context must be closed.
@@ -483,11 +475,14 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
*/
private final AtomicInteger writers = new AtomicInteger(1);
+ /** Primary node/partition flag. */
+ private final boolean primary;
+
/**
* @param gctx Group context.
* @param part Partition id.
*/
- public PartitionDumpContext(CacheGroupContext gctx, int part) {
+ private PartitionDumpContext(CacheGroupContext gctx, int part) {
assert gctx != null;
try {
@@ -495,8 +490,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
grp = gctx.groupId();
topVer = gctx.topology().lastTopologyChangeVersion();
- startVer = grpPrimaries.get(gctx.groupId()).contains(part) ? gctx.shared().versions().last() : null;
- isolatedStreamerVer = cctx.versions().isolatedStreamerVersion();
+ primary = grpPrimaries.get(gctx.groupId()).contains(part);
serializer = new DumpEntrySerializer(
thLocBufs,
@@ -658,9 +652,10 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
*
* @param ver Entry version.
* @return {@code True} if {@code ver} appeared after dump started.
+ * @see GridCacheVersionManager#isolatedStreamerVersion()
*/
private boolean afterStart(GridCacheVersion ver) {
- return (startVer != null && ver.isGreater(startVer)) && !isolatedStreamerVer.equals(ver);
+ return primary && ver.isGreater(startVer);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
index 7acf0f30bf4..fd2c0e44b2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -41,13 +42,19 @@ import java.util.stream.IntStream;
import java.util.zip.ZipInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpConsumer;
import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.dump.DumpReader;
import org.apache.ignite.dump.DumpReaderConfiguration;
@@ -64,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer;
@@ -123,6 +131,9 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
/** */
private LogListener lsnr;
+ /** */
+ private boolean persistence;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -135,6 +146,11 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
cfg.setGridLogger(testLog);
}
+ if (persistence) {
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
+ }
+
return cfg;
}
@@ -142,11 +158,98 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
stopAllGrids();
cleanPersistenceDir();
}
+ /** Checks a dump when it is created with the data streamer just after a restart. */
+ @Test
+ public void testDumpAfterRestartWithStreamer() throws Exception {
+ doTestDumpAfterRestart(true);
+ }
+
+ /** Checks a dump when it is created just after a restart. */
+ @Test
+ public void testDumpAfterRestart() throws Exception {
+ doTestDumpAfterRestart(false);
+ }
+
+ /** Doest dump test when it is created just after restart. */
+ private void doTestDumpAfterRestart(boolean useDataStreamer) throws Exception {
+ persistence = true;
+
+ int nodes = 2;
+
+ IgniteEx ign0 = startGrids(nodes);
+
+ ign0.cluster().state(ClusterState.ACTIVE);
+
+ ign0.createCache(defaultCacheConfiguration());
+
+ try (IgniteDataStreamer<Integer, String> ds = ign0.dataStreamer(DEFAULT_CACHE_NAME)) {
+ IgniteCache<Integer, String> cache = ign0.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; ++i) {
+ if (useDataStreamer)
+ ds.addData(i, "" + i);
+ else
+ cache.put(i, "" + i);
+ }
+ }
+
+ stopAllGrids(false);
+ IgniteEx ign1 = startGrids(nodes);
+ ign1.cluster().state(ClusterState.ACTIVE);
+
+ ign1.snapshot().createDump(DMP_NAME, Collections.singletonList(DEFAULT_CACHE_NAME)).get(getTestTimeout());
+
+ ign1.destroyCache(DEFAULT_CACHE_NAME);
+
+ new DumpReader(new DumpReaderConfiguration(dumpDirectory(ign1, DMP_NAME), new DumpConsumer() {
+ @Override public void start() {
+ // No-op.
+ }
+
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ // No-op.
+ }
+
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ // No-op.
+ }
+
+ @Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
+ caches.forEachRemaining(cacheData -> ign1.createCache(cacheData.config()));
+ }
+
+ @Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) {
+ data.forEachRemaining(de ->
+ ign1.cache(ign1.context().cache().cacheDescriptor(de.cacheId()).cacheName()).put(de.key(), de.value())
+ );
+ }
+
+ @Override public void stop() {
+ // No-op.
+ }
+ }), log).run();
+
+ IgniteCache<Integer, String> cache = ign1.cache(DEFAULT_CACHE_NAME);
+
+ assertNotNull(cache);
+ assertEquals(KEYS_CNT, cache.size());
+
+ for (int i = 0; i < KEYS_CNT; ++i)
+ assertEquals(i + "", cache.get(i));
+ }
+
/** */
@Test
public void testSnapshotDirectoryCreatedLazily() throws Exception {