You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2022/10/27 19:58:03 UTC
[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Copy of binary metadata added (#10350)
This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
new bd2056191ba IGNITE-17613 Copy of binary metadata added (#10350)
bd2056191ba is described below
commit bd2056191ba0f334503db69665a3ec74a5fee658
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Oct 27 22:57:52 2022 +0300
IGNITE-17613 Copy of binary metadata added (#10350)
---
.../snapshot/IncrementalSnapshotFutureTask.java | 113 +++++++++++++++------
1 file changed, 83 insertions(+), 30 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index 2bbaad1d4c4..01c44e178c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
@@ -25,14 +27,22 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
+
/** */
class IncrementalSnapshotFutureTask
extends AbstractSnapshotFutureTask<IncrementalSnapshotFutureTaskResult>
@@ -122,41 +132,25 @@ class IncrementalSnapshotFutureTask
cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
try {
- // First increment must include low segment, because full snapshot knows nothing about WAL.
- // All other begins from the next segment because lowPtr already saved inside previous increment.
- long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
- long highIdx = highPtr.index();
-
- assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
- : "WAL Compaction must be enabled";
- assert lowIdx <= highIdx;
-
- if (log.isInfoEnabled())
- log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
-
- cctx.wal().awaitCompacted(highPtr.index());
+ copyWal(incSnpDir);
- if (log.isInfoEnabled()) {
- log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
- "highIdx=" + highIdx + ']');
- }
+ File snpMarshallerDir = MarshallerContextImpl.mappingFileStoreWorkDir(incSnpDir.getAbsolutePath());
- for (; lowIdx <= highIdx; lowIdx++) {
- File seg = cctx.wal().compactedSegment(lowIdx);
+ copyFiles(
+ MarshallerContextImpl.mappingFileStoreWorkDir(cctx.gridConfig().getWorkDirectory()),
+ snpMarshallerDir,
+ BinaryUtils::notTmpFile
+ );
- if (!seg.exists()) {
- onDone(new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'));
+ PdsFolderSettings<?> pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
- return;
- }
+ File snpBinMetaDir = new File(incSnpDir, DataStorageConfiguration.DFLT_BINARY_METADATA_PATH);
- Path segLink = incSnpDir.toPath().resolve(seg.getName());
-
- if (log.isDebugEnabled())
- log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
-
- Files.createLink(segLink, seg.toPath());
- }
+ copyFiles(
+ binaryWorkDir(cctx.gridConfig().getWorkDirectory(), pdsSettings.folderName()),
+ snpBinMetaDir,
+ file -> file.getName().endsWith(METADATA_FILE_SUFFIX)
+ );
onDone(new IncrementalSnapshotFutureTaskResult());
}
@@ -172,6 +166,65 @@ class IncrementalSnapshotFutureTask
}
}
+ /**
+ * Copies WAL segments to the incremental snapshot directory.
+ *
+ * @param incSnpDir Incremental snapshot directory.
+ * @throws IgniteInterruptedCheckedException If failed.
+ * @throws IOException If failed.
+ */
+ private void copyWal(File incSnpDir) throws IgniteInterruptedCheckedException, IOException {
+ // First increment must include low segment, because full snapshot knows nothing about WAL.
+ // All other begins from the next segment because lowPtr already saved inside previous increment.
+ long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+ long highIdx = highPtr.index();
+
+ assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
+ : "WAL Compaction must be enabled";
+ assert lowIdx <= highIdx;
+
+ if (log.isInfoEnabled())
+ log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
+
+ cctx.wal().awaitCompacted(highPtr.index());
+
+ if (log.isInfoEnabled()) {
+ log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
+ "highIdx=" + highIdx + ']');
+ }
+
+ for (; lowIdx <= highIdx; lowIdx++) {
+ File seg = cctx.wal().compactedSegment(lowIdx);
+
+ if (!seg.exists())
+ throw new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']');
+
+ Path segLink = incSnpDir.toPath().resolve(seg.getName());
+
+ if (log.isDebugEnabled())
+ log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
+
+ Files.createLink(segLink, seg.toPath());
+ }
+ }
+
+ /**
+ * Copy files {@code fromDir} to {@code toDir}.
+ *
+ * @param fromDir From directory.
+ * @param toDir To directory.
+ * @param filter File filter.
+ */
+ private void copyFiles(File fromDir, File toDir, FileFilter filter) throws IOException {
+ assert fromDir.exists() && fromDir.isDirectory();
+
+ if (!toDir.isDirectory() && !toDir.exists() && !toDir.mkdirs())
+ throw new IgniteException("Target directory can't be created [target=" + toDir.getAbsolutePath() + ']');
+
+ for (File from : fromDir.listFiles(filter))
+ Files.copy(from.toPath(), new File(toDir, from.getName()).toPath());
+ }
+
/** {@inheritDoc} */
@Override public void acceptException(Throwable th) {
cctx.cache().configManager().removeConfigurationChangeListener(this);