You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/12/20 19:11:09 UTC
(ignite) branch master updated: IGNITE-21125 Prevent duplicate keys written to dump (#11112)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b583b7dc201 IGNITE-21125 Prevent duplicate keys written to dump (#11112)
b583b7dc201 is described below
commit b583b7dc201764f93ea65eb9ff554a0cdaa57957
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Dec 20 22:11:01 2023 +0300
IGNITE-21125 Prevent duplicate keys written to dump (#11112)
---
.../snapshot/dump/CreateDumpFutureTask.java | 158 ++++++++++++++++-----
.../cache/persistence/snapshot/dump/Dump.java | 16 ---
.../processors/cache/tree/CacheDataTree.java | 11 ++
3 files changed, 130 insertions(+), 55 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 3f181b71a97..050ef15daff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -47,12 +47,15 @@ import org.apache.ignite.dump.DumpEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -60,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.Abstract
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.util.BasicRateLimiter;
@@ -266,6 +271,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
long entriesCnt0 = 0;
long writtenEntriesCnt0 = 0;
+ int coCtxCache = 0;
+ CacheObjectContext coCtx = null;
+
try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
try (GridCloseableIterator<CacheDataRow> rows = gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
if (rows == null)
@@ -278,12 +286,21 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
int cache = row.cacheId() == 0 ? grp : row.cacheId();
- if (dumpCtx.writeForIterator(cache, row.expireTime(), row.key(), row.value(), row.version()))
+ if (cache != coCtxCache) {
+ coCtxCache = cache;
+
+ coCtx = cctx.cacheObjectContext(coCtxCache);
+ }
+
+ if (dumpCtx.writeForIterator(cache, row.expireTime(), row.key(), row.value(), row.version(), coCtx))
writtenEntriesCnt0++;
entriesCnt0++;
}
}
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
entriesCnt.addAndGet(entriesCnt0);
writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
@@ -297,7 +314,6 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
", iterEntriesCnt=" + entriesCnt +
", writtenIterEntriesCnt=" + entriesCnt +
", changedEntriesCnt=" + changedEntriesCnt + ']');
-
}
}
})).collect(Collectors.toList());
@@ -333,7 +349,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
dumpContext(grp, part).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
}
- catch (IgniteException e) {
+ catch (IgniteException | IgniteCheckedException | IOException e) {
acceptException(e);
}
}
@@ -418,6 +434,12 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
*/
final Map<Integer, Set<KeyCacheObject>> changed;
+ /** Cache id for {@link #iterLastKey} */
+ int iterLastKeyCache;
+
+ /** Last key dumped via {@link #writeForIterator(int, long, KeyCacheObject, CacheObject, GridCacheVersion, CacheObjectContext)}. */
+ @Nullable KeyCacheObject iterLastKey;
+
/** Count of entries changed during dump creation. */
LongAdder changedCnt = new LongAdder();
@@ -506,7 +528,13 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
* @param val Value before change.
* @param ver Version before change.
*/
- public void writeChanged(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) {
+ public void writeChanged(
+ int cache,
+ long expireTime,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver
+ ) throws IgniteCheckedException, IOException {
String reasonToSkip = null;
if (closed) // Quick exit. Partition already saved in dump.
@@ -517,16 +545,25 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
try {
if (closed) // Partition already saved in dump.
reasonToSkip = "partition already saved";
- else if (isAfterStart(ver))
+ else if (afterStart(ver))
reasonToSkip = "greater version";
- else if (!changed.get(cache).add(key)) // Entry changed several time during dump.
- reasonToSkip = "changed several times";
- else if (val == null)
- reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip.
else {
- write(cache, expireTime, key, val, ver);
-
- changedCnt.increment();
+ CacheObjectContext coCtx = cctx.cacheObjectContext(cache);
+
+ synchronized (serializer) { // Prevent concurrent access to the dump file.
+ if (writtenByIterator(cache, key, coCtx))
+ reasonToSkip = "written by iterator"; // Saved by iterator, already. Skip.
+ else if (!changed.get(cache).add(key)) // Entry changed several time during dump.
+ reasonToSkip = "changed several times";
+ else if (val == null)
+ // Previous value is null. Entry created after dump start, skip.
+ reasonToSkip = "newly created or already removed";
+ else
+ write(cache, expireTime, key, val, ver, coCtx);
+ }
+
+ if (reasonToSkip == null)
+ changedCnt.increment();
}
}
finally {
@@ -539,7 +576,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
", cache=" + cache +
", part=" + part +
", key=" + key +
- ", written=" + (reasonToSkip == null ? "true" : reasonToSkip) + ']');
+ ", ver=" + ver +
+ ", written=" + (reasonToSkip == null ? "true" : reasonToSkip) +
+ ", startVer=" + (startVer != null) + ']');
}
}
@@ -559,16 +598,24 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
long expireTime,
KeyCacheObject key,
CacheObject val,
- GridCacheVersion ver
- ) {
- boolean written = true;
+ GridCacheVersion ver,
+ CacheObjectContext coCtx
+ ) throws IgniteCheckedException, IOException {
+ String reason = null;
- if (isAfterStart(ver))
- written = false;
- else if (changed.get(cache).contains(key))
- written = false;
- else
- write(cache, expireTime, key, val, ver);
+ if (afterStart(ver))
+ reason = "greater version";
+ else {
+ synchronized (serializer) { // Prevent concurrent access to the dump file.
+ iterLastKeyCache = cache;
+ iterLastKey = key;
+
+ if (changed.get(cache).contains(key))
+ reason = "written by listener";
+ else
+ write(cache, expireTime, key, val, ver, coCtx);
+ }
+ }
if (log.isTraceEnabled()) {
log.trace("Iterator [" +
@@ -576,30 +623,31 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
", cache=" + cache +
", part=" + part +
", key=" + key +
- ", written=" + written +
- ", ver=" + ver + ']');
+ ", written=" + (reason == null ? "true" : reason) +
+ ", ver=" + ver +
+ ", startVer=" + (startVer != null) + ']');
}
- return written;
+ return reason == null;
}
/** */
- private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) {
- synchronized (serializer) { // Prevent concurrent access to the dump file.
- try {
- ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, cctx.cacheObjectContext(cache));
+ private void write(
+ int cache,
+ long expireTime,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ CacheObjectContext coCtx
+ ) throws IgniteCheckedException, IOException {
+ ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, coCtx);
- rateLimiter.acquire(buf.limit());
+ rateLimiter.acquire(buf.limit());
- if (file.writeFully(buf) != buf.limit())
- throw new IgniteException("Can't write row");
+ if (file.writeFully(buf) != buf.limit())
+ throw new IgniteException("Can't write row");
- processedSize.addAndGet(buf.limit());
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
+ processedSize.addAndGet(buf.limit());
}
/**
@@ -610,10 +658,42 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
* @param ver Entry version.
* @return {@code True} if {@code ver} appeared after dump started.
*/
- private boolean isAfterStart(GridCacheVersion ver) {
+ private boolean afterStart(GridCacheVersion ver) {
return (startVer != null && ver.isGreater(startVer)) && !isolatedStreamerVer.equals(ver);
}
+ /**
+ * Iterator returned by {@link IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+ * iterates key in ascending order set by {@link CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+ * So if key changed by the user (see {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)})
+ * is greater than last key written by partition iterator then it hasn't been saved in dump yet and must be written.
+ * Otherwise, key already saved by the iterator and must be skiped.
+ *
+ * @param cache Cache id.
+ * @param key Key to write with {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}.
+ * @return {@code True} if key written by iterator, already. {@code False} otherwise.
+ * @see CacheDataTree#compareBytes(byte[], byte[])
+ */
+ private boolean writtenByIterator(int cache, KeyCacheObject key, CacheObjectContext coCtx) throws IgniteCheckedException {
+ if (iterLastKey == null)
+ return false;
+
+ int cmp = Integer.compare(iterLastKeyCache, cache);
+
+ if (cmp != 0)
+ return cmp > 0;
+
+ cmp = Integer.compare(iterLastKey.hashCode(), key.hashCode());
+
+ if (cmp != 0)
+ return cmp > 0;
+
+ byte[] bytes1 = iterLastKey.valueBytes(coCtx);
+ byte[] bytes2 = key.valueBytes(coCtx);
+
+ return CacheDataTree.compareBytes(bytes1, bytes2) >= 0;
+ }
+
/** {@inheritDoc} */
@Override public void close() {
synchronized (this) { // Prevent concurrent close invocation.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
index 0eb68b5b8e2..215a7e9570f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
@@ -26,12 +26,10 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@@ -304,8 +302,6 @@ public class Dump implements AutoCloseable {
return new DumpedPartitionIterator() {
DumpEntry next;
- Set<Object> partKeys = new HashSet<>();
-
/** {@inheritDoc} */
@Override public boolean hasNext() {
advance();
@@ -334,16 +330,6 @@ public class Dump implements AutoCloseable {
try {
next = serializer.read(dumpFile, group, part);
-
- /*
- * During dumping entry can be dumped twice: by partition iterator and change listener.
- * Excluding duplicates keys from iteration.
- */
- while (next != null && !partKeys.add(next.key()))
- next = serializer.read(dumpFile, group, part);
-
- if (next == null)
- partKeys = null; // Let GC do the rest.
}
catch (IOException | IgniteCheckedException e) {
throw new IgniteException(e);
@@ -353,8 +339,6 @@ public class Dump implements AutoCloseable {
/** {@inheritDoc} */
@Override public void close() {
U.closeQuiet(dumpFile);
-
- partKeys = null;
}
};
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 9e0029cc9ff..9b29b6cdcdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.tree;
+import java.util.Comparator;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -473,6 +474,16 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext());
byte[] bytes2 = key.valueBytes(grp.cacheObjectContext());
+ return compareBytes(bytes1, bytes2);
+ }
+
+ /**
+ * @param bytes1 First key bytes.
+ * @param bytes2 Second key bytes.
+ * @return Comparsion result of bytes arrays according to {@link Comparator#compare(Object, Object)} contract.
+ * @see CacheDataTree#compareKeys(KeyCacheObject, long)
+ */
+ public static int compareBytes(byte[] bytes1, byte[] bytes2) {
int lenCmp = Integer.compare(bytes1.length, bytes2.length);
if (lenCmp != 0)