You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/06/16 09:05:25 UTC
[ignite] branch master updated: IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6c81b938d3d IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)
6c81b938d3d is described below
commit 6c81b938d3db9b1c7bfb32a2d671afca4f8a98c1
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Thu Jun 16 12:05:14 2022 +0300
IGNITE-17140 Snapshot restore: fixed remote partitions files move (#10082)
---
.../snapshot/IgniteSnapshotManager.java | 25 +++-------
.../snapshot/SnapshotRestoreProcess.java | 57 +++++++++-------------
.../apache/ignite/internal/util/lang/GridFunc.java | 30 ------------
.../snapshot/IgniteSnapshotRemoteRequestTest.java | 6 +--
4 files changed, 33 insertions(+), 85 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7647eb4b736..89fda5ef28a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -209,6 +209,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.filename.P
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.formatTmpDirName;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getPageIO;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
@@ -298,9 +299,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** File transmission parameter of cache partition id. */
private static final String SNP_PART_ID_PARAM = "partId";
- /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
- private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
-
/** File transmission parameter of a cache directory with is currently sends its partitions. */
private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
@@ -1930,7 +1928,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
return new RemoteSnapshotSender(log,
cctx.kernalContext().pools().getSnapshotExecutorService(),
- databaseRelativePath(pdsSettings.folderName()),
cctx.gridIO().openTransmissionSender(nodeId, DFLT_INITIAL_SNAPSHOT_TOPIC),
rqId);
}
@@ -2843,9 +2840,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** {@inheritDoc} */
@Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
- String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
-
String rqId = (String)fileMeta.params().get(RQ_ID_NAME_PARAM);
Integer partsCnt = (Integer)fileMeta.params().get(SNP_PARTITIONS_CNT);
@@ -2864,11 +2859,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
try {
task.partsLeft.compareAndSet(-1, partsCnt);
- File cacheDir = U.resolveWorkDirectory(task.dir.toString(),
- Paths.get(rmtDbNodePath, cacheDirName).toString(),
- false);
+ File cacheDir = FilePageStoreManager.cacheWorkDir(storeMgr.workDir(), cacheDirName);
+
+ File tmpCacheDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(),
+ formatTmpDirName(cacheDir).getName(), false);
- return Paths.get(cacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
+ return Paths.get(tmpCacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -2999,9 +2995,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Snapshot name. */
private final String rqId;
- /** Local node persistent directory with consistent id. */
- private final String relativeNodePath;
-
/** The number of cache partition files expected to be processed. */
private int partsCnt;
@@ -3013,7 +3006,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
public RemoteSnapshotSender(
IgniteLogger log,
Executor exec,
- String relativeNodePath,
GridIoManager.TransmissionSender sndr,
String rqId
) {
@@ -3021,15 +3013,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
this.sndr = sndr;
this.rqId = rqId;
- this.relativeNodePath = relativeNodePath;
}
/** {@inheritDoc} */
@Override protected void init(int partsCnt) {
this.partsCnt = partsCnt;
-
- if (F.isEmpty(relativeNodePath))
- throw new IgniteException("Relative node path cannot be empty.");
}
/** {@inheritDoc} */
@@ -3076,7 +3064,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
- params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
params.put(RQ_ID_NAME_PARAM, rqId);
params.put(SNP_PARTITIONS_CNT, partsCnt);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 1b0a176c932..7c28fa54c57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -621,10 +620,22 @@ public class SnapshotRestoreProcess {
* @param cacheDir Cache directory.
* @return Temporary directory.
*/
- private static File formatTmpDirName(File cacheDir) {
+ static File formatTmpDirName(File cacheDir) {
return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
}
+ /**
+ * @param tmpCacheDir Temporary cache directory.
+ * @return Cache or group id.
+ */
+ static int groupIdFromTmpDir(File tmpCacheDir) {
+ assert tmpCacheDir.getName().startsWith(TMP_CACHE_DIR_PREFIX) : tmpCacheDir;
+
+ String cacheGrpName = tmpCacheDir.getName().substring(TMP_CACHE_DIR_PREFIX.length());
+
+ return CU.cacheId(cacheGroupName(new File(tmpCacheDir.getParentFile(), cacheGrpName)));
+ }
+
/**
* @param req Request to prepare cache group restore from the snapshot.
* @param metas Local snapshot metadatas.
@@ -978,10 +989,6 @@ public class SnapshotRestoreProcess {
(grpId, partId) -> rmtLoadParts.get(grpId) != null &&
rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId, opCtx0.processedParts)));
- Map<Integer, File> grpToDir = opCtx0.dirs.stream()
- .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
- d -> d));
-
try {
if (log.isInfoEnabled() && !snpAff.isEmpty()) {
log.info("Trying to request partitions from remote nodes " +
@@ -1002,38 +1009,22 @@ public class SnapshotRestoreProcess {
if (opCtx0.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
- if (t == null) {
- int grpId = CU.cacheId(cacheGroupName(snpFile.getParentFile()));
- int partId = partId(snpFile.getName());
-
- PartitionRestoreFuture partFut = F.find(allParts.get(grpId),
- null,
- new IgnitePredicate<PartitionRestoreFuture>() {
- @Override public boolean apply(PartitionRestoreFuture f) {
- return f.partId == partId;
- }
- });
+ if (t != null) {
+ opCtx0.errHnd.accept(t);
+ completeListExceptionally(rmtAwaitParts, t);
- assert partFut != null : snpFile.getAbsolutePath();
+ return;
+ }
- File tmpCacheDir = formatTmpDirName(grpToDir.get(grpId));
+ int grpId = groupIdFromTmpDir(snpFile.getParentFile());
+ int partId = partId(snpFile.getName());
- Path partFile = Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName());
+ PartitionRestoreFuture partFut = F.find(allParts.get(grpId), null,
+ fut -> fut.partId == partId);
- try {
- Files.move(snpFile.toPath(), partFile);
+ assert partFut != null : snpFile.getAbsolutePath();
- partFut.complete(partFile);
- }
- catch (Exception e) {
- opCtx0.errHnd.accept(e);
- completeListExceptionally(rmtAwaitParts, e);
- }
- }
- else {
- opCtx0.errHnd.accept(t);
- completeListExceptionally(rmtAwaitParts, t);
- }
+ partFut.complete(snpFile.toPath());
});
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index e757b2a17de..0b4d4863597 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2085,36 +2085,6 @@ public class GridFunc {
return dfltVal;
}
- /**
- * Finds, transforms and returns first element in given collection for which any of
- * the provided predicates evaluates to {@code true}.
- *
- * @param c Input collection.
- * @param dfltVal Default value to return when no element is found.
- * @param f Transforming closure.
- * @param p Optional set of finder predicates.
- * @param <V> Type of the collection elements.
- * @return First element in given collection for which predicate evaluates to
- * {@code true} - or {@code null} if such element cannot be found.
- */
- @Deprecated
- public static <V, Y> Y find(Iterable<? extends V> c, @Nullable Y dfltVal, IgniteClosure<? super V, Y> f,
- @Nullable IgnitePredicate<? super V>... p) {
- A.notNull(c, "c", f, "f");
-
- if (isAlwaysTrue(p) && c.iterator().hasNext())
- return f.apply(c.iterator().next());
-
- if (!isEmpty(p) && !isAlwaysFalse(p)) {
- for (V v : c) {
- if (isAny(v, p))
- return f.apply(v);
- }
- }
-
- return dfltVal;
- }
-
/**
* Checks if collection {@code c1} contains any elements from collection {@code c2}.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index 0ef0740743c..d61630b6240 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -49,8 +49,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.groupIdFromTmpDir;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -222,7 +222,7 @@ public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestor
() -> false,
(part, t) -> {
if (t == null) {
- int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+ int grpId = groupIdFromTmpDir(part.getParentFile());
assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
assertTrue("Received partition has not been requested",
@@ -309,7 +309,7 @@ public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestor
return (part, t) -> {
assertNull(t);
- int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+ int grpId = groupIdFromTmpDir(part.getParentFile());
assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
assertTrue("Received partition has not been requested",