You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/09/14 07:22:15 UTC

[ignite] branch master updated: IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6132f2cae IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
6e6132f2cae is described below

commit 6e6132f2caeb4f1d5b5cf5c4c40c45928e821765
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Sep 14 10:22:04 2022 +0300

    IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
---
 .../GridCacheDatabaseSharedManager.java            |  2 +-
 .../IgniteCacheDatabaseSharedManager.java          | 27 +++++--
 .../persistence/wal/FileWriteAheadLogManager.java  | 18 ++++-
 .../java/org/apache/ignite/cdc/WalForCdcTest.java  | 90 ++++++++++++++++++++--
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 445e1062d8a..bd402a9b441 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2951,7 +2951,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param highBound Upper bound.
      * @throws IgniteCheckedException If failed.
      */
-    public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
+    @Override public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
         checkpointManager.removeCheckpointsUntil(highBound);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 8cf730a6a33..c78914136be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1156,17 +1156,18 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
         if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode())
             return;
 
-        WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true);
+        try (WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true)) {
+            while (iter.hasNext())
+                iter.next();
 
-        while (iter.hasNext())
-            iter.next();
+            WALPointer ptr = iter.lastRead().orElse(null);
 
-        WALPointer ptr = iter.lastRead().orElse(null);
+            if (ptr != null)
+                ptr = ptr.next();
 
-        if (ptr != null)
-            ptr = ptr.next();
-
-        cctx.wal(true).resumeLogging(ptr);
+            cctx.wal(true).startAutoReleaseSegments();
+            cctx.wal(true).resumeLogging(ptr);
+        }
     }
 
     /**
@@ -1750,4 +1751,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
             (warmUpConfig) -> "Unknown data region warm-up configuration: " + errPostfix.get()
         );
     }
+
+    /**
+     * Wal truncate callback.
+     *
+     * @param highBound Upper bound.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
+        // No-op.
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b4cad823d75..868871288e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -56,6 +56,7 @@ import java.util.zip.ZipOutputStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -81,8 +82,8 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -340,6 +341,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Positive (non-0) value indicates WAL must be archived even if not complete. */
     private final long walForceArchiveTimeout;
 
+    /**
+     * {@code True} if WAL enabled only for CDC.
+     * This mean {@link DataRegionConfiguration#isPersistenceEnabled()} is {@code false} for all {@link DataRegion},
+     * and {@link DataRegionConfiguration#isCdcEnabled()} {@code true} for some of them.
+     */
+    private final boolean inMemoryCdc;
+
     /**
      * Container with last WAL record logged timestamp.<br> Zero value means there was no records logged to current
      * segment, skip possible archiving for this case<br> Value is filled only for case {@link
@@ -423,6 +431,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
         walForceArchiveTimeout = dsCfg.getWalForceArchiveTimeout();
+        inMemoryCdc = !CU.isPersistenceEnabled(dsCfg) && CU.isCdcEnabled(igCfg);
 
         timeoutRolloverMux = (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0) ? new Object() : null;
 
@@ -1374,6 +1383,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 segmentSize.put(idx, currSize);
             }
             finally {
+                // Move checkpoint pointer to the edge as node don't have actual checkpoints in `inMemoryCdc=true` mode.
+                // This will allow cleaner to remove segments from archive.
+                if (inMemoryCdc)
+                    notchLastCheckpointPtr(hnd.position());
+
                 if (archiver == null)
                     segmentAware.addSize(idx, currSize - reservedSize);
             }
@@ -3294,7 +3308,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                                 + ", maxSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ']');
                         }
 
-                        ((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr);
+                        cctx.database().onWalTruncated(highPtr);
 
                         int truncated = truncate(highPtr);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 953f88827f2..9f39ede2252 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.cdc;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.IntConsumer;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,10 +36,12 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
@@ -53,6 +58,9 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+import static org.apache.ignite.internal.util.IgniteUtils.KB;
+import static org.apache.ignite.internal.util.IgniteUtils.MB;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -76,6 +84,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
     /** */
     private boolean cdcEnabled;
 
+    /** */
+    private long archiveSz = UNLIMITED_WAL_ARCHIVE;
+
     /** */
     @Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
     public static Collection<?> parameters() {
@@ -94,6 +105,8 @@ public class WalForCdcTest extends GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setWalSegmentSize((int)(2 * MB))
+            .setMaxWalArchiveSize(archiveSz)
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setPersistenceEnabled(persistenceEnabled)
                 .setCdcEnabled(cdcEnabled)));
@@ -196,6 +209,60 @@ public class WalForCdcTest extends GridCommonAbstractTest {
         assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr"));
     }
 
+    /** */
+    @Test
+    public void testArchiveCleared() throws Exception {
+        persistenceEnabled = false;
+        cdcEnabled = true;
+        archiveSz = 10 * MB;
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(
+            new CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+                .setCacheMode(mode)
+                .setAtomicityMode(atomicityMode));
+
+        IntConsumer createData = (entryCnt) -> {
+            for (int i = 0; i < entryCnt; i++) {
+                byte[] payload = new byte[(int)KB];
+
+                ThreadLocalRandom.current().nextBytes(payload);
+
+                cache.put(i, payload);
+            }
+        };
+
+        IgniteWriteAheadLogManager wal = ignite.context().cache().context().wal(true);
+
+        long startSgmnt = wal.currentSegment();
+
+        createData.accept((int)(archiveSz / (2 * KB)));
+
+        long finishSgmnt = wal.currentSegment();
+
+        String archive = archive(ignite);
+
+        assertTrue(finishSgmnt > startSgmnt);
+        assertTrue(
+            "Wait for start segment archivation",
+            waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout())
+        );
+
+        File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt));
+
+        assertTrue("Check archived segment file exists", startSgmntArchived.exists());
+
+        createData.accept((int)(archiveSz / KB));
+
+        assertTrue(
+            "Wait for archived segment cleaned",
+            waitForCondition(() -> !startSgmntArchived.exists(), getTestTimeout())
+        );
+    }
+
     /** */
     private void doTestWal(
         IgniteEx ignite,
@@ -223,16 +290,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
 
     /** */
     private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
-        String archive = U.resolveWorkDirectory(
-            U.defaultWorkDirectory(),
-            ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
-                U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
-            false
-        ).getAbsolutePath();
-
         WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder()
             .ioFactory(new RandomAccessFileIOFactory())
-            .filesOrDirs(archive));
+            .filesOrDirs(archive(ignite)));
 
         int walRecCnt = 0;
 
@@ -254,4 +314,18 @@ public class WalForCdcTest extends GridCommonAbstractTest {
 
         return walRecCnt;
     }
+
+    /**
+     * @param ignite Ignite.
+     * @return WAL archive patch
+     * @throws IgniteCheckedException If failed
+     */
+    private static String archive(IgniteEx ignite) throws IgniteCheckedException {
+        return U.resolveWorkDirectory(
+            U.defaultWorkDirectory(),
+            ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
+                U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
+            false
+        ).getAbsolutePath();
+    }
 }