You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2022/10/27 19:58:03 UTC

[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Copy of binary metadata added (#10350)

This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
     new bd2056191ba IGNITE-17613 Copy of binary metadata added (#10350)
bd2056191ba is described below

commit bd2056191ba0f334503db69665a3ec74a5fee658
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Oct 27 22:57:52 2022 +0300

    IGNITE-17613 Copy of binary metadata added (#10350)
---
 .../snapshot/IncrementalSnapshotFutureTask.java    | 113 +++++++++++++++------
 1 file changed, 83 insertions(+), 30 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index 2bbaad1d4c4..01c44e178c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.HashSet;
@@ -25,14 +27,22 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiConsumer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
+
 /** */
 class IncrementalSnapshotFutureTask
     extends AbstractSnapshotFutureTask<IncrementalSnapshotFutureTaskResult>
@@ -122,41 +132,25 @@ class IncrementalSnapshotFutureTask
 
             cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
                 try {
-                    // First increment must include low segment, because full snapshot knows nothing about WAL.
-                    // All other begins from the next segment because lowPtr already saved inside previous increment.
-                    long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
-                    long highIdx = highPtr.index();
-
-                    assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
-                        : "WAL Compaction must be enabled";
-                    assert lowIdx <= highIdx;
-
-                    if (log.isInfoEnabled())
-                        log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
-
-                    cctx.wal().awaitCompacted(highPtr.index());
+                    copyWal(incSnpDir);
 
-                    if (log.isInfoEnabled()) {
-                        log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
-                            "highIdx=" + highIdx + ']');
-                    }
+                    File snpMarshallerDir = MarshallerContextImpl.mappingFileStoreWorkDir(incSnpDir.getAbsolutePath());
 
-                    for (; lowIdx <= highIdx; lowIdx++) {
-                        File seg = cctx.wal().compactedSegment(lowIdx);
+                    copyFiles(
+                        MarshallerContextImpl.mappingFileStoreWorkDir(cctx.gridConfig().getWorkDirectory()),
+                        snpMarshallerDir,
+                        BinaryUtils::notTmpFile
+                    );
 
-                        if (!seg.exists()) {
-                            onDone(new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'));
+                    PdsFolderSettings<?> pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
 
-                            return;
-                        }
+                    File snpBinMetaDir = new File(incSnpDir, DataStorageConfiguration.DFLT_BINARY_METADATA_PATH);
 
-                        Path segLink = incSnpDir.toPath().resolve(seg.getName());
-
-                        if (log.isDebugEnabled())
-                            log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
-
-                        Files.createLink(segLink, seg.toPath());
-                    }
+                    copyFiles(
+                        binaryWorkDir(cctx.gridConfig().getWorkDirectory(), pdsSettings.folderName()),
+                        snpBinMetaDir,
+                        file -> file.getName().endsWith(METADATA_FILE_SUFFIX)
+                    );
 
                     onDone(new IncrementalSnapshotFutureTaskResult());
                 }
@@ -172,6 +166,65 @@ class IncrementalSnapshotFutureTask
         }
     }
 
+    /**
+     * Copies WAL segments to the incremental snapshot directory.
+     *
+     * @param incSnpDir Incremental snapshot directory.
+     * @throws IgniteInterruptedCheckedException If failed.
+     * @throws IOException If failed.
+     */
+    private void copyWal(File incSnpDir) throws IgniteInterruptedCheckedException, IOException {
+        // First increment must include low segment, because full snapshot knows nothing about WAL.
+        // All other begins from the next segment because lowPtr already saved inside previous increment.
+        long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+        long highIdx = highPtr.index();
+
+        assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
+            : "WAL Compaction must be enabled";
+        assert lowIdx <= highIdx;
+
+        if (log.isInfoEnabled())
+            log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
+
+        cctx.wal().awaitCompacted(highPtr.index());
+
+        if (log.isInfoEnabled()) {
+            log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
+                "highIdx=" + highIdx + ']');
+        }
+
+        for (; lowIdx <= highIdx; lowIdx++) {
+            File seg = cctx.wal().compactedSegment(lowIdx);
+
+            if (!seg.exists())
+                throw new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']');
+
+            Path segLink = incSnpDir.toPath().resolve(seg.getName());
+
+            if (log.isDebugEnabled())
+                log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
+
+            Files.createLink(segLink, seg.toPath());
+        }
+    }
+
+    /**
+     * Copy files {@code fromDir} to {@code toDir}.
+     *
+     * @param fromDir From directory.
+     * @param toDir To directory.
+     * @param filter File filter.
+     */
+    private void copyFiles(File fromDir, File toDir, FileFilter filter) throws IOException {
+        assert fromDir.exists() && fromDir.isDirectory();
+
+        if (!toDir.isDirectory() && !toDir.exists() && !toDir.mkdirs())
+            throw new IgniteException("Target directory can't be created [target=" + toDir.getAbsolutePath() + ']');
+
+        for (File from : fromDir.listFiles(filter))
+            Files.copy(from.toPath(), new File(toDir, from.getName()).toPath());
+    }
+
     /** {@inheritDoc} */
     @Override public void acceptException(Throwable th) {
         cctx.cache().configManager().removeConfigurationChangeListener(this);