You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/09/14 07:22:15 UTC
[ignite] branch master updated: IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6e6132f2cae IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
6e6132f2cae is described below
commit 6e6132f2caeb4f1d5b5cf5c4c40c45928e821765
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Sep 14 10:22:04 2022 +0300
IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
---
.../GridCacheDatabaseSharedManager.java | 2 +-
.../IgniteCacheDatabaseSharedManager.java | 27 +++++--
.../persistence/wal/FileWriteAheadLogManager.java | 18 ++++-
.../java/org/apache/ignite/cdc/WalForCdcTest.java | 90 ++++++++++++++++++++--
4 files changed, 118 insertions(+), 19 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 445e1062d8a..bd402a9b441 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2951,7 +2951,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
- public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
+ @Override public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointManager.removeCheckpointsUntil(highBound);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 8cf730a6a33..c78914136be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1156,17 +1156,18 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode())
return;
- WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true);
+ try (WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true)) {
+ while (iter.hasNext())
+ iter.next();
- while (iter.hasNext())
- iter.next();
+ WALPointer ptr = iter.lastRead().orElse(null);
- WALPointer ptr = iter.lastRead().orElse(null);
+ if (ptr != null)
+ ptr = ptr.next();
- if (ptr != null)
- ptr = ptr.next();
-
- cctx.wal(true).resumeLogging(ptr);
+ cctx.wal(true).startAutoReleaseSegments();
+ cctx.wal(true).resumeLogging(ptr);
+ }
}
/**
@@ -1750,4 +1751,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
(warmUpConfig) -> "Unknown data region warm-up configuration: " + errPostfix.get()
);
}
+
+ /**
+ * Wal truncate callback.
+ *
+ * @param highBound Upper bound.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
+ // No-op.
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b4cad823d75..868871288e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -56,6 +56,7 @@ import java.util.zip.ZipOutputStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -81,8 +82,8 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -340,6 +341,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Positive (non-0) value indicates WAL must be archived even if not complete. */
private final long walForceArchiveTimeout;
+ /**
+ * {@code True} if WAL enabled only for CDC.
+ * This mean {@link DataRegionConfiguration#isPersistenceEnabled()} is {@code false} for all {@link DataRegion},
+ * and {@link DataRegionConfiguration#isCdcEnabled()} {@code true} for some of them.
+ */
+ private final boolean inMemoryCdc;
+
/**
* Container with last WAL record logged timestamp.<br> Zero value means there was no records logged to current
* segment, skip possible archiving for this case<br> Value is filled only for case {@link
@@ -423,6 +431,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
walForceArchiveTimeout = dsCfg.getWalForceArchiveTimeout();
+ inMemoryCdc = !CU.isPersistenceEnabled(dsCfg) && CU.isCdcEnabled(igCfg);
timeoutRolloverMux = (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0) ? new Object() : null;
@@ -1374,6 +1383,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentSize.put(idx, currSize);
}
finally {
+ // Move checkpoint pointer to the edge as node don't have actual checkpoints in `inMemoryCdc=true` mode.
+ // This will allow cleaner to remove segments from archive.
+ if (inMemoryCdc)
+ notchLastCheckpointPtr(hnd.position());
+
if (archiver == null)
segmentAware.addSize(idx, currSize - reservedSize);
}
@@ -3294,7 +3308,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
+ ", maxSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ']');
}
- ((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr);
+ cctx.database().onWalTruncated(highPtr);
int truncated = truncate(highPtr);
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 953f88827f2..9f39ede2252 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -17,12 +17,15 @@
package org.apache.ignite.cdc;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.IntConsumer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,10 +36,12 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
@@ -53,6 +58,9 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+import static org.apache.ignite.internal.util.IgniteUtils.KB;
+import static org.apache.ignite.internal.util.IgniteUtils.MB;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -76,6 +84,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
/** */
private boolean cdcEnabled;
+ /** */
+ private long archiveSz = UNLIMITED_WAL_ARCHIVE;
+
/** */
@Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
public static Collection<?> parameters() {
@@ -94,6 +105,8 @@ public class WalForCdcTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+ .setWalSegmentSize((int)(2 * MB))
+ .setMaxWalArchiveSize(archiveSz)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(cdcEnabled)));
@@ -196,6 +209,60 @@ public class WalForCdcTest extends GridCommonAbstractTest {
assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr"));
}
+ /** */
+ @Test
+ public void testArchiveCleared() throws Exception {
+ persistenceEnabled = false;
+ cdcEnabled = true;
+ archiveSz = 10 * MB;
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+ .setCacheMode(mode)
+ .setAtomicityMode(atomicityMode));
+
+ IntConsumer createData = (entryCnt) -> {
+ for (int i = 0; i < entryCnt; i++) {
+ byte[] payload = new byte[(int)KB];
+
+ ThreadLocalRandom.current().nextBytes(payload);
+
+ cache.put(i, payload);
+ }
+ };
+
+ IgniteWriteAheadLogManager wal = ignite.context().cache().context().wal(true);
+
+ long startSgmnt = wal.currentSegment();
+
+ createData.accept((int)(archiveSz / (2 * KB)));
+
+ long finishSgmnt = wal.currentSegment();
+
+ String archive = archive(ignite);
+
+ assertTrue(finishSgmnt > startSgmnt);
+ assertTrue(
+ "Wait for start segment archivation",
+ waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout())
+ );
+
+ File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt));
+
+ assertTrue("Check archived segment file exists", startSgmntArchived.exists());
+
+ createData.accept((int)(archiveSz / KB));
+
+ assertTrue(
+ "Wait for archived segment cleaned",
+ waitForCondition(() -> !startSgmntArchived.exists(), getTestTimeout())
+ );
+ }
+
/** */
private void doTestWal(
IgniteEx ignite,
@@ -223,16 +290,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
/** */
private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
- String archive = U.resolveWorkDirectory(
- U.defaultWorkDirectory(),
- ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
- U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
- false
- ).getAbsolutePath();
-
WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder()
.ioFactory(new RandomAccessFileIOFactory())
- .filesOrDirs(archive));
+ .filesOrDirs(archive(ignite)));
int walRecCnt = 0;
@@ -254,4 +314,18 @@ public class WalForCdcTest extends GridCommonAbstractTest {
return walRecCnt;
}
+
+ /**
+ * @param ignite Ignite.
+ * @return WAL archive patch
+ * @throws IgniteCheckedException If failed
+ */
+ private static String archive(IgniteEx ignite) throws IgniteCheckedException {
+ return U.resolveWorkDirectory(
+ U.defaultWorkDirectory(),
+ ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
+ U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
+ false
+ ).getAbsolutePath();
+ }
}