You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/12/20 19:11:09 UTC

(ignite) branch master updated: IGNITE-21125 Prevent duplicate keys written to dump (#11112)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b583b7dc201 IGNITE-21125 Prevent duplicate keys written to dump (#11112)
b583b7dc201 is described below

commit b583b7dc201764f93ea65eb9ff554a0cdaa57957
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Dec 20 22:11:01 2023 +0300

    IGNITE-21125 Prevent duplicate keys written to dump (#11112)
---
 .../snapshot/dump/CreateDumpFutureTask.java        | 158 ++++++++++++++++-----
 .../cache/persistence/snapshot/dump/Dump.java      |  16 ---
 .../processors/cache/tree/CacheDataTree.java       |  11 ++
 3 files changed, 130 insertions(+), 55 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 3f181b71a97..050ef15daff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -47,12 +47,15 @@ import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -60,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.Abstract
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.util.BasicRateLimiter;
@@ -266,6 +271,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
             long entriesCnt0 = 0;
             long writtenEntriesCnt0 = 0;
 
+            int coCtxCache = 0;
+            CacheObjectContext coCtx = null;
+
             try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
                 try (GridCloseableIterator<CacheDataRow> rows = gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
                     if (rows == null)
@@ -278,12 +286,21 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
 
                         int cache = row.cacheId() == 0 ? grp : row.cacheId();
 
-                        if (dumpCtx.writeForIterator(cache, row.expireTime(), row.key(), row.value(), row.version()))
+                        if (cache != coCtxCache) {
+                            coCtxCache = cache;
+
+                            coCtx = cctx.cacheObjectContext(coCtxCache);
+                        }
+
+                        if (dumpCtx.writeForIterator(cache, row.expireTime(), row.key(), row.value(), row.version(), coCtx))
                             writtenEntriesCnt0++;
 
                         entriesCnt0++;
                     }
                 }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
 
                 entriesCnt.addAndGet(entriesCnt0);
                 writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
@@ -297,7 +314,6 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
                         ", iterEntriesCnt=" + entriesCnt +
                         ", writtenIterEntriesCnt=" + entriesCnt +
                         ", changedEntriesCnt=" + changedEntriesCnt + ']');
-
                 }
             }
         })).collect(Collectors.toList());
@@ -333,7 +349,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
 
             dumpContext(grp, part).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
         }
-        catch (IgniteException e) {
+        catch (IgniteException | IgniteCheckedException | IOException e) {
             acceptException(e);
         }
     }
@@ -418,6 +434,12 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
          */
         final Map<Integer, Set<KeyCacheObject>> changed;
 
+        /** Cache id for {@link #iterLastKey} */
+        int iterLastKeyCache;
+
+        /** Last key dumped via {@link #writeForIterator(int, long, KeyCacheObject, CacheObject, GridCacheVersion, CacheObjectContext)}. */
+        @Nullable KeyCacheObject iterLastKey;
+
         /** Count of entries changed during dump creation. */
         LongAdder changedCnt = new LongAdder();
 
@@ -506,7 +528,13 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
          * @param val Value before change.
          * @param ver Version before change.
          */
-        public void writeChanged(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) {
+        public void writeChanged(
+            int cache,
+            long expireTime,
+            KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver
+        ) throws IgniteCheckedException, IOException {
             String reasonToSkip = null;
 
             if (closed) // Quick exit. Partition already saved in dump.
@@ -517,16 +545,25 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
                 try {
                     if (closed) // Partition already saved in dump.
                         reasonToSkip = "partition already saved";
-                    else if (isAfterStart(ver))
+                    else if (afterStart(ver))
                         reasonToSkip = "greater version";
-                    else if (!changed.get(cache).add(key)) // Entry changed several time during dump.
-                        reasonToSkip = "changed several times";
-                    else if (val == null)
-                        reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip.
                     else {
-                        write(cache, expireTime, key, val, ver);
-
-                        changedCnt.increment();
+                        CacheObjectContext coCtx = cctx.cacheObjectContext(cache);
+
+                        synchronized (serializer) { // Prevent concurrent access to the dump file.
+                            if (writtenByIterator(cache, key, coCtx))
+                                reasonToSkip = "written by iterator"; // Saved by iterator, already. Skip.
+                            else if (!changed.get(cache).add(key)) // Entry changed several time during dump.
+                                reasonToSkip = "changed several times";
+                            else if (val == null)
+                                // Previous value is null. Entry created after dump start, skip.
+                                reasonToSkip = "newly created or already removed";
+                            else
+                                write(cache, expireTime, key, val, ver, coCtx);
+                        }
+
+                        if (reasonToSkip == null)
+                            changedCnt.increment();
                     }
                 }
                 finally {
@@ -539,7 +576,9 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
                     ", cache=" + cache +
                     ", part=" + part +
                     ", key=" + key +
-                    ", written=" + (reasonToSkip == null ? "true" : reasonToSkip) + ']');
+                    ", ver=" + ver +
+                    ", written=" + (reasonToSkip == null ? "true" : reasonToSkip) +
+                    ", startVer=" + (startVer != null) + ']');
             }
         }
 
@@ -559,16 +598,24 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
             long expireTime,
             KeyCacheObject key,
             CacheObject val,
-            GridCacheVersion ver
-        ) {
-            boolean written = true;
+            GridCacheVersion ver,
+            CacheObjectContext coCtx
+        ) throws IgniteCheckedException, IOException {
+            String reason = null;
 
-            if (isAfterStart(ver))
-                written = false;
-            else if (changed.get(cache).contains(key))
-                written = false;
-            else
-                write(cache, expireTime, key, val, ver);
+            if (afterStart(ver))
+                reason = "greater version";
+            else {
+                synchronized (serializer) { // Prevent concurrent access to the dump file.
+                    iterLastKeyCache = cache;
+                    iterLastKey = key;
+
+                    if (changed.get(cache).contains(key))
+                        reason = "written by listener";
+                    else
+                        write(cache, expireTime, key, val, ver, coCtx);
+                }
+            }
 
             if (log.isTraceEnabled()) {
                 log.trace("Iterator [" +
@@ -576,30 +623,31 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
                     ", cache=" + cache +
                     ", part=" + part +
                     ", key=" + key +
-                    ", written=" + written +
-                    ", ver=" + ver + ']');
+                    ", written=" + (reason == null ? "true" : reason) +
+                    ", ver=" + ver +
+                    ", startVer=" + (startVer != null) + ']');
             }
 
-            return written;
+            return reason == null;
         }
 
         /** */
-        private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) {
-            synchronized (serializer) { // Prevent concurrent access to the dump file.
-                try {
-                    ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, cctx.cacheObjectContext(cache));
+        private void write(
+            int cache,
+            long expireTime,
+            KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            CacheObjectContext coCtx
+        ) throws IgniteCheckedException, IOException {
+            ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, coCtx);
 
-                    rateLimiter.acquire(buf.limit());
+            rateLimiter.acquire(buf.limit());
 
-                    if (file.writeFully(buf) != buf.limit())
-                        throw new IgniteException("Can't write row");
+            if (file.writeFully(buf) != buf.limit())
+                throw new IgniteException("Can't write row");
 
-                    processedSize.addAndGet(buf.limit());
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-            }
+            processedSize.addAndGet(buf.limit());
         }
 
         /**
@@ -610,10 +658,42 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple
          * @param ver Entry version.
          * @return {@code True} if {@code ver} appeared after dump started.
          */
-        private boolean isAfterStart(GridCacheVersion ver) {
+        private boolean afterStart(GridCacheVersion ver) {
             return (startVer != null && ver.isGreater(startVer)) && !isolatedStreamerVer.equals(ver);
         }
 
+        /**
+         * Iterator returned by {@link IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+         * iterates key in ascending order set by {@link CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+         * So if key changed by the user (see {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)})
+         * is greater than last key written by partition iterator then it hasn't been saved in dump yet and must be written.
+         * Otherwise, key already saved by the iterator and must be skiped.
+         *
+         * @param cache Cache id.
+         * @param key Key to write with {@link #writeChanged(int, long, KeyCacheObject, CacheObject, GridCacheVersion)}.
+         * @return {@code True} if key written by iterator, already. {@code False} otherwise.
+         * @see CacheDataTree#compareBytes(byte[], byte[])
+         */
+        private boolean writtenByIterator(int cache, KeyCacheObject key, CacheObjectContext coCtx) throws IgniteCheckedException {
+            if (iterLastKey == null)
+                return false;
+
+            int cmp = Integer.compare(iterLastKeyCache, cache);
+
+            if (cmp != 0)
+                return cmp > 0;
+
+            cmp = Integer.compare(iterLastKey.hashCode(), key.hashCode());
+
+            if (cmp != 0)
+                return cmp > 0;
+
+            byte[] bytes1 = iterLastKey.valueBytes(coCtx);
+            byte[] bytes2 = key.valueBytes(coCtx);
+
+            return CacheDataTree.compareBytes(bytes1, bytes2) >= 0;
+        }
+
         /** {@inheritDoc} */
         @Override public void close() {
             synchronized (this) { // Prevent concurrent close invocation.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
index 0eb68b5b8e2..215a7e9570f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/Dump.java
@@ -26,12 +26,10 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
@@ -304,8 +302,6 @@ public class Dump implements AutoCloseable {
         return new DumpedPartitionIterator() {
             DumpEntry next;
 
-            Set<Object> partKeys = new HashSet<>();
-
             /** {@inheritDoc} */
             @Override public boolean hasNext() {
                 advance();
@@ -334,16 +330,6 @@ public class Dump implements AutoCloseable {
 
                 try {
                     next = serializer.read(dumpFile, group, part);
-
-                    /*
-                     * During dumping entry can be dumped twice: by partition iterator and change listener.
-                     * Excluding duplicates keys from iteration.
-                     */
-                    while (next != null && !partKeys.add(next.key()))
-                        next = serializer.read(dumpFile, group, part);
-
-                    if (next == null)
-                        partKeys = null; // Let GC do the rest.
                 }
                 catch (IOException | IgniteCheckedException e) {
                     throw new IgniteException(e);
@@ -353,8 +339,6 @@ public class Dump implements AutoCloseable {
             /** {@inheritDoc} */
             @Override public void close() {
                 U.closeQuiet(dumpFile);
-
-                partKeys = null;
             }
         };
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 9e0029cc9ff..9b29b6cdcdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.tree;
 
+import java.util.Comparator;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagemem.PageUtils;
@@ -473,6 +474,16 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
         byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext());
         byte[] bytes2 = key.valueBytes(grp.cacheObjectContext());
 
+        return compareBytes(bytes1, bytes2);
+    }
+
+    /**
+     * @param bytes1 First key bytes.
+     * @param bytes2 Second key bytes.
+     * @return Comparsion result of bytes arrays according to {@link Comparator#compare(Object, Object)} contract.
+     * @see CacheDataTree#compareKeys(KeyCacheObject, long)
+     */
+    public static int compareBytes(byte[] bytes1, byte[] bytes2) {
         int lenCmp = Integer.compare(bytes1.length, bytes2.length);
 
         if (lenCmp != 0)