You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/06/16 09:05:25 UTC

[ignite] branch master updated: IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)

This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c81b938d3d IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)
6c81b938d3d is described below

commit 6c81b938d3db9b1c7bfb32a2d671afca4f8a98c1
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Thu Jun 16 12:05:14 2022 +0300

    IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)
---
 .../snapshot/IgniteSnapshotManager.java            | 25 +++-------
 .../snapshot/SnapshotRestoreProcess.java           | 57 +++++++++-------------
 .../apache/ignite/internal/util/lang/GridFunc.java | 30 ------------
 .../snapshot/IgniteSnapshotRemoteRequestTest.java  |  6 +--
 4 files changed, 33 insertions(+), 85 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7647eb4b736..89fda5ef28a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -209,6 +209,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.filename.P
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.formatTmpDirName;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
@@ -298,9 +299,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** File transmission parameter of cache partition id. */
     private static final String SNP_PART_ID_PARAM = "partId";
 
-    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
-    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
-
     /** File transmission parameter of a cache directory with is currently sends its partitions. */
     private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
 
@@ -1930,7 +1928,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
         return new RemoteSnapshotSender(log,
             cctx.kernalContext().pools().getSnapshotExecutorService(),
-            databaseRelativePath(pdsSettings.folderName()),
             cctx.gridIO().openTransmissionSender(nodeId, DFLT_INITIAL_SNAPSHOT_TOPIC),
             rqId);
     }
@@ -2843,9 +2840,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         /** {@inheritDoc} */
         @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
             Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
-            String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
             String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
-
             String rqId = (String)fileMeta.params().get(RQ_ID_NAME_PARAM);
             Integer partsCnt = (Integer)fileMeta.params().get(SNP_PARTITIONS_CNT);
 
@@ -2864,11 +2859,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             try {
                 task.partsLeft.compareAndSet(-1, partsCnt);
 
-                File cacheDir = U.resolveWorkDirectory(task.dir.toString(),
-                    Paths.get(rmtDbNodePath, cacheDirName).toString(),
-                    false);
+                File cacheDir = FilePageStoreManager.cacheWorkDir(storeMgr.workDir(), cacheDirName);
+
+                File tmpCacheDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(),
+                    formatTmpDirName(cacheDir).getName(), false);
 
-                return Paths.get(cacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
+                return Paths.get(tmpCacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -2999,9 +2995,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         /** Snapshot name. */
         private final String rqId;
 
-        /** Local node persistent directory with consistent id. */
-        private final String relativeNodePath;
-
         /** The number of cache partition files expected to be processed. */
         private int partsCnt;
 
@@ -3013,7 +3006,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         public RemoteSnapshotSender(
             IgniteLogger log,
             Executor exec,
-            String relativeNodePath,
             GridIoManager.TransmissionSender sndr,
             String rqId
         ) {
@@ -3021,15 +3013,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
             this.sndr = sndr;
             this.rqId = rqId;
-            this.relativeNodePath = relativeNodePath;
         }
 
         /** {@inheritDoc} */
         @Override protected void init(int partsCnt) {
             this.partsCnt = partsCnt;
-
-            if (F.isEmpty(relativeNodePath))
-                throw new IgniteException("Relative node path cannot be empty.");
         }
 
         /** {@inheritDoc} */
@@ -3076,7 +3064,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
             params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
             params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
-            params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
             params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
             params.put(RQ_ID_NAME_PARAM, rqId);
             params.put(SNP_PARTITIONS_CNT, partsCnt);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 1b0a176c932..7c28fa54c57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -621,10 +620,22 @@ public class SnapshotRestoreProcess {
      * @param cacheDir Cache directory.
      * @return Temporary directory.
      */
-    private static File formatTmpDirName(File cacheDir) {
+    static File formatTmpDirName(File cacheDir) {
         return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
     }
 
+    /**
+     * @param tmpCacheDir Temporary cache directory.
+     * @return Cache or group id.
+     */
+    static int groupIdFromTmpDir(File tmpCacheDir) {
+        assert tmpCacheDir.getName().startsWith(TMP_CACHE_DIR_PREFIX) : tmpCacheDir;
+
+        String cacheGrpName = tmpCacheDir.getName().substring(TMP_CACHE_DIR_PREFIX.length());
+
+        return CU.cacheId(cacheGroupName(new File(tmpCacheDir.getParentFile(), cacheGrpName)));
+    }
+
     /**
      * @param req Request to prepare cache group restore from the snapshot.
      * @param metas Local snapshot metadatas.
@@ -978,10 +989,6 @@ public class SnapshotRestoreProcess {
                 (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
                     rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId, opCtx0.processedParts)));
 
-            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
-                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
-                    d -> d));
-
             try {
                 if (log.isInfoEnabled() && !snpAff.isEmpty()) {
                     log.info("Trying to request partitions from remote nodes " +
@@ -1002,38 +1009,22 @@ public class SnapshotRestoreProcess {
                                 if (opCtx0.stopChecker.getAsBoolean())
                                     throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
 
-                                if (t == null) {
-                                    int grpId = CU.cacheId(cacheGroupName(snpFile.getParentFile()));
-                                    int partId = partId(snpFile.getName());
-
-                                    PartitionRestoreFuture partFut = F.find(allParts.get(grpId),
-                                        null,
-                                        new IgnitePredicate<PartitionRestoreFuture>() {
-                                            @Override public boolean apply(PartitionRestoreFuture f) {
-                                                return f.partId == partId;
-                                            }
-                                        });
+                                if (t != null) {
+                                    opCtx0.errHnd.accept(t);
+                                    completeListExceptionally(rmtAwaitParts, t);
 
-                                    assert partFut != null : snpFile.getAbsolutePath();
+                                    return;
+                                }
 
-                                    File tmpCacheDir = formatTmpDirName(grpToDir.get(grpId));
+                                int grpId = groupIdFromTmpDir(snpFile.getParentFile());
+                                int partId = partId(snpFile.getName());
 
-                                    Path partFile = Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName());
+                                PartitionRestoreFuture partFut = F.find(allParts.get(grpId), null,
+                                    fut -> fut.partId == partId);
 
-                                    try {
-                                        Files.move(snpFile.toPath(), partFile);
+                                assert partFut != null : snpFile.getAbsolutePath();
 
-                                        partFut.complete(partFile);
-                                    }
-                                    catch (Exception e) {
-                                        opCtx0.errHnd.accept(e);
-                                        completeListExceptionally(rmtAwaitParts, e);
-                                    }
-                                }
-                                else {
-                                    opCtx0.errHnd.accept(t);
-                                    completeListExceptionally(rmtAwaitParts, t);
-                                }
+                                partFut.complete(snpFile.toPath());
                             });
                 }
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index e757b2a17de..0b4d4863597 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2085,36 +2085,6 @@ public class GridFunc {
         return dfltVal;
     }
 
-    /**
-     * Finds, transforms and returns first element in given collection for which any of
-     * the provided predicates evaluates to {@code true}.
-     *
-     * @param c Input collection.
-     * @param dfltVal Default value to return when no element is found.
-     * @param f Transforming closure.
-     * @param p Optional set of finder predicates.
-     * @param <V> Type of the collection elements.
-     * @return First element in given collection for which predicate evaluates to
-     *      {@code true} - or {@code null} if such element cannot be found.
-     */
-    @Deprecated
-    public static <V, Y> Y find(Iterable<? extends V> c, @Nullable Y dfltVal, IgniteClosure<? super V, Y> f,
-        @Nullable IgnitePredicate<? super V>... p) {
-        A.notNull(c, "c", f, "f");
-
-        if (isAlwaysTrue(p) && c.iterator().hasNext())
-            return f.apply(c.iterator().next());
-
-        if (!isEmpty(p) && !isAlwaysFalse(p)) {
-            for (V v : c) {
-                if (isAny(v, p))
-                    return f.apply(v);
-            }
-        }
-
-        return dfltVal;
-    }
-
     /**
      * Checks if collection {@code c1} contains any elements from collection {@code c2}.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index 0ef0740743c..d61630b6240 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -49,8 +49,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -222,7 +222,7 @@ public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestor
             () -> false,
             (part, t) -> {
                 if (t == null) {
-                    int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+                    int grpId = groupIdFromTmpDir(part.getParentFile());
 
                     assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
                     assertTrue("Received partition has not been requested",
@@ -309,7 +309,7 @@ public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestor
         return (part, t) -> {
             assertNull(t);
 
-            int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+            int grpId = groupIdFromTmpDir(part.getParentFile());
 
             assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
             assertTrue("Received partition has not been requested",