You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/10 07:09:05 UTC
[ignite] branch master updated: IGNITE-13877 Restructuring WAL work
directory after enabling WAL archive. - Fixes #8681.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new defc398 IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681.
defc398 is described below
commit defc39832e53b4032ba2883fbdaed77228c99ac2
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Feb 10 09:48:14 2021 +0300
IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../persistence/wal/FileWriteAheadLogManager.java | 228 +++++++++++++---
.../persistence/wal/WalArchiveConsistencyTest.java | 294 +++++++++++++++++++++
.../testframework/junits/GridAbstractTest.java | 16 ++
.../ignite/testsuites/IgnitePdsTestSuite.java | 3 +
4 files changed, 508 insertions(+), 33 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a7eee3c..a22ef7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
@@ -127,9 +126,12 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
@@ -216,13 +218,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP);
/**
- * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started.
- */
- private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
- IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE,
- DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
-
- /**
* Number of WAL compressor worker threads.
*/
private final int WAL_COMPRESSOR_WORKER_THREAD_CNT =
@@ -407,7 +402,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
- allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+ double thresholdWalArchiveSizePercentage = IgniteSystemProperties.getDouble(
+ IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+
+ allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * thresholdWalArchiveSizePercentage);
evt = ctx.event();
failureProcessor = ctx.failure();
@@ -462,8 +460,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
metrics = dbMgr.persistentStoreMetricsImpl();
- checkOrPrepareFiles();
-
if (metrics != null) {
metrics.setWalSizeProvider(new CO<Long>() {
/** {@inheritDoc} */
@@ -492,11 +488,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
if (isArchiverEnabled())
- archiver = new FileArchiver(segmentAware, log);
+ archiver = new FileArchiver(log);
if (!walArchiveUnlimited())
cleaner = new FileCleaner(log);
+ prepareAndCheckWalFiles();
+
+ if (compressor != null)
+ compressor.initAlreadyCompressedSegments();
+
+ if (archiver != null)
+ archiver.init(segmentAware);
+
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
fileHandleManager = fileHandleManagerFactory.build(
@@ -1502,11 +1506,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Deletes temp files creates and prepares new; Creates the first segment if necessary.
+ * Prepare and check WAL files.
*
* @throws StorageException If failed.
*/
- private void checkOrPrepareFiles() throws StorageException {
+ private void prepareAndCheckWalFiles() throws StorageException {
Collection<File> tmpFiles = new HashSet<>();
for (File walDir : F.asList(walWorkDir, walArchiveDir)) {
@@ -1521,21 +1525,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
- File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
+ if (F.isEmpty(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)))
+ createFile(new File(walWorkDir, fileName(0)));
- if (isArchiverEnabled() && !F.isEmpty(allFiles) && allFiles.length > dsCfg.getWalSegments()) {
- throw new StorageException("Failed to initialize wal (work directory contains incorrect " +
- "number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
- }
+ if (isArchiverEnabled()) {
+ moveSegmentsToArchive();
- // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
- if (F.isEmpty(allFiles)) {
- File first = new File(walWorkDir, fileName(0));
+ renameLastSegment();
+
+ formatWorkSegments();
- createFile(first);
- }
- else if (isArchiverEnabled())
checkFiles(0, false, null, null);
+ }
}
/**
@@ -1724,14 +1725,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Constructor.
*
- * @param segmentAware Segment aware.
* @param log Logger.
*/
- private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException {
+ private FileArchiver(IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
-
- init(segmentAware);
}
/**
@@ -2048,7 +2046,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Background creation of all segments except first. First segment was created in main thread by {@link
- * FileWriteAheadLogManager#checkOrPrepareFiles()}
+ * FileWriteAheadLogManager#prepareAndCheckWalFiles()}
*/
private void allocateRemainingFiles() throws StorageException {
checkFiles(
@@ -2092,8 +2090,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
FileCompressor(IgniteLogger log) {
super(0, log);
-
- initAlreadyCompressedSegments();
}
/** */
@@ -2936,7 +2932,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (files == null)
return Collections.emptyList();
- return Arrays.stream(files).map(File::getName).sorted().collect(Collectors.toList());
+ return Arrays.stream(files).map(File::getName).sorted().collect(toList());
}
/** {@inheritDoc} */
@@ -3252,4 +3248,170 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
new IgniteThread(this).start();
}
}
+
+ /**
+ * Moving working segments to archive, if segments are more than {@link DataStorageConfiguration#getWalSegments()}
+ * or index of first segment is not 0. All segments will be moved except for last one,
+ * as well as all compressed segments.
+ *
+ * @throws StorageException If an error occurs while moving.
+ */
+ private void moveSegmentsToArchive() throws StorageException {
+ assert isArchiverEnabled();
+
+ FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+ List<FileDescriptor> toMove = new ArrayList<>();
+
+ if (!F.isEmpty(workSegments) && (workSegments.length > dsCfg.getWalSegments() || workSegments[0].idx() != 0))
+ toMove.addAll(F.asList(workSegments).subList(0, workSegments.length - 1));
+
+ toMove.addAll(F.asList(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER))));
+
+ if (!toMove.isEmpty()) {
+ log.warning("Content of WAL working directory needs rearrangement, some WAL segments will be moved to " +
+ "archive: " + walArchiveDir.getAbsolutePath() + ". Segments from " + toMove.get(0).file().getName() +
+ " to " + toMove.get(toMove.size() - 1).file().getName() + " will be moved, total number of files: " +
+ toMove.size() + ". This operation may take some time.");
+
+ for (int i = 0, j = 0; i < toMove.size(); i++) {
+ FileDescriptor fd = toMove.get(i);
+
+ File tmpDst = new File(walArchiveDir, fd.file().getName() + TMP_SUFFIX);
+ File dst = new File(walArchiveDir, fd.file().getName());
+
+ try {
+ Files.copy(fd.file().toPath(), tmpDst.toPath());
+
+ Files.move(tmpDst.toPath(), dst.toPath());
+
+ Files.delete(fd.file().toPath());
+
+ if (log.isDebugEnabled()) {
+ log.debug("WAL segment moved [src=" + fd.file().getAbsolutePath() +
+ ", dst=" + dst.getAbsolutePath() + ']');
+ }
+
+ // Batch output.
+ if (log.isInfoEnabled() && (i == toMove.size() - 1 || (i != 0 && i % 9 == 0))) {
+ log.info("WAL segments moved: " + toMove.get(j).file().getName() +
+ (i == j ? "" : " - " + toMove.get(i).file().getName()));
+
+ j = i + 1;
+ }
+ }
+ catch (IOException e) {
+ throw new StorageException("Failed to move WAL segment [src=" + fd.file().getAbsolutePath() +
+ ", dst=" + dst.getAbsolutePath() + ']', e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Renaming last segment if it is only one and its index is greater than {@link DataStorageConfiguration#getWalSegments()}.
+ *
+ * @throws StorageException If an error occurs while renaming.
+ */
+ private void renameLastSegment() throws StorageException {
+ assert isArchiverEnabled();
+
+ FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+ if (workSegments.length == 1 && workSegments[0].idx() != workSegments[0].idx() % dsCfg.getWalSegments()) {
+ FileDescriptor toRen = workSegments[0];
+
+ if (log.isInfoEnabled()) {
+ log.info("Last WAL segment file has to be renamed from " + toRen.file().getName() + " to " +
+ fileName(toRen.idx() % dsCfg.getWalSegments()) + '.');
+ }
+
+ String toRenFileName = fileName(toRen.idx() % dsCfg.getWalSegments());
+
+ File tmpDst = new File(walWorkDir, toRenFileName + TMP_SUFFIX);
+ File dst = new File(walWorkDir, toRenFileName);
+
+ try {
+ Files.copy(toRen.file().toPath(), tmpDst.toPath());
+
+ Files.move(tmpDst.toPath(), dst.toPath());
+
+ Files.delete(toRen.file().toPath());
+
+ if (log.isInfoEnabled()) {
+ log.info("WAL segment renamed [src=" + toRen.file().getAbsolutePath() +
+ ", dst=" + dst.getAbsolutePath() + ']');
+ }
+ }
+ catch (IOException e) {
+ throw new StorageException("Failed to rename WAL segment [src=" +
+ toRen.file().getAbsolutePath() + ", dst=" + dst.getAbsolutePath() + ']', e);
+ }
+ }
+ }
+
+ /**
+ * Formatting working segments to {@link DataStorageConfiguration#getWalSegmentSize()} for work in a mmap or fsync case.
+ *
+ * @throws StorageException If an error occurs when formatting.
+ */
+ private void formatWorkSegments() throws StorageException {
+ assert isArchiverEnabled();
+
+ if (mode == WALMode.FSYNC || mmap) {
+ List<FileDescriptor> toFormat = Arrays.stream(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)))
+ .filter(fd -> fd.file().length() < dsCfg.getWalSegmentSize()).collect(toList());
+
+ if (!toFormat.isEmpty()) {
+ if (log.isInfoEnabled()) {
+ log.info("WAL segments in working directory should have the same size: '" +
+ U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + "'. Segments that need reformat " +
+ "found: " + F.viewReadOnly(toFormat, fd -> fd.file().getName()) + '.');
+ }
+
+ for (int i = 0, j = 0; i < toFormat.size(); i++) {
+ FileDescriptor fd = toFormat.get(i);
+
+ File tmpDst = new File(fd.file().getName() + TMP_SUFFIX);
+
+ try {
+ Files.copy(fd.file().toPath(), tmpDst.toPath());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Start formatting WAL segment [filePath=" + tmpDst.getAbsolutePath() +
+ ", fileSize=" + U.humanReadableByteCount(tmpDst.length()) +
+ ", toSize=" + U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + ']');
+ }
+
+ try (FileIO fileIO = ioFactory.create(tmpDst, CREATE, READ, WRITE)) {
+ int left = (int)(dsCfg.getWalSegmentSize() - tmpDst.length());
+
+ fileIO.position(tmpDst.length());
+
+ while (left > 0)
+ left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left));
+
+ fileIO.force();
+ }
+
+ Files.move(tmpDst.toPath(), fd.file().toPath(), REPLACE_EXISTING, ATOMIC_MOVE);
+
+ if (log.isDebugEnabled())
+ log.debug("WAL segment formatted: " + fd.file().getAbsolutePath());
+
+ // Batch output.
+ if (log.isInfoEnabled() && (i == toFormat.size() - 1 || (i != 0 && i % 9 == 0))) {
+ log.info("WAL segments formatted: " + toFormat.get(j).file().getName() +
+ (i == j ? "" : " - " + fileName(i)));
+
+ j = i + 1;
+ }
+ }
+ catch (IOException e) {
+ throw new StorageException("Failed to format WAL segment: " + fd.file().getAbsolutePath(), e);
+ }
+ }
+ }
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
new file mode 100644
index 0000000..43a26c2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Class for testing cases when WAL archive configuration was changed and the node was able to start.
+ */
+@RunWith(Parameterized.class)
+@WithSystemProperty(key = IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, value = "0.0")
+public class WalArchiveConsistencyTest extends GridCommonAbstractTest {
+ /**
+ * WAL mode.
+ */
+ @Parameterized.Parameter
+ public WALMode walMode;
+
+ /**
+ * @return Test parameters.
+ */
+ @Parameterized.Parameters(name = "walMode={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {WALMode.LOG_ONLY},
+ new Object[] {WALMode.FSYNC}
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setWalSegments(10)
+ .setWalSegmentSize((int)U.MB)
+ .setMaxWalArchiveSize(10 * U.MB)
+ .setWalMode(walMode)
+ .setWalFsyncDelayNanos(100)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(U.GB)
+ )
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgOp) throws Exception {
+ IgniteEx n = super.startGrid(idx, cfgOp);
+
+ n.cluster().state(ClusterState.ACTIVE);
+ awaitPartitionMapExchange();
+
+ return n;
+ }
+
+ /**
+ * Verify that when switching WAL archive off -> on and increasing the
+ * number of WAL segments on restarting the node, the recovery will be consistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIncreaseWalSegmentsWithoutTruncate() throws Exception {
+ checkRecoveryWithoutWalTruncate(12);
+ }
+
+ /**
+ * Verify that when switching WAL archive off -> on and decreasing the
+ * number of WAL segments on restarting the node, the recovery will be consistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDecreaseWalSegmentsWithoutTruncate() throws Exception {
+ checkRecoveryWithoutWalTruncate(4);
+ }
+
+ /**
+ * Checking that when switching WAL archive off -> on,
+ * reducing WAL segments at the start of the node
+ * and truncation some WAL segments, the recovery will be consistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDecreaseWalSegmentsWithTruncate0() throws Exception {
+ checkRecoveryWithWalTruncate(5);
+ }
+
+ /**
+ * Checking that when switching WAL archive off -> on,
+ * reducing WAL segments at the start of the node
+ * and truncation some WAL segments, the recovery will be consistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDecreaseWalSegmentsWithTruncate1() throws Exception {
+ checkRecoveryWithWalTruncate(6);
+ }
+
+ /**
+ * Checking that when switching WAL archive off -> on
+ * and truncation some WAL segments, the recovery will be consistent.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNotChangeWalSegmentsWithTruncate() throws Exception {
+ checkRecoveryWithWalTruncate(10);
+ }
+
+ /**
+ * Checking the consistency of recovery from a WAL when switching
+ * WAL archive off -> on and changing the number of segments on node restart.
+ * With truncate WAL segments.
+ *
+ * @param segments Segment count on node restart.
+ * @throws Exception If failed.
+ */
+ private void checkRecoveryWithWalTruncate(int segments) throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH);
+ });
+
+ AtomicInteger key = new AtomicInteger();
+
+ dbMgr(n).checkpointReadLock();
+
+ try {
+ fill(n, 6, key);
+
+ // Protection against deleting WAL segments.
+ assertTrue(walMgr(n).reserve(new WALPointer(5, 0, 0)));
+ }
+ finally {
+ dbMgr(n).checkpointReadUnlock();
+ }
+
+ forceCheckpoint();
+ assertTrue(waitForCondition(() -> walMgr(n).lastTruncatedSegment() == 4, getTestTimeout()));
+
+ // Guaranteed recovery from WAL segments.
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ fill(n, 2, key);
+
+ stopAllGrids();
+
+ IgniteEx n0 = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().setWalSegments(segments);
+ });
+
+ assertEquals(key.get(), n0.cache(DEFAULT_CACHE_NAME).size());
+ }
+
+ /**
+ * Checking the consistency of recovery from a WAL when switching
+ * WAL archive off -> on and changing the number of segments on node restart.
+ * Without truncate WAL segments.
+ *
+ * @param segments Segment count on node restart.
+ * @throws Exception If failed.
+ */
+ private void checkRecoveryWithoutWalTruncate(int segments) throws Exception {
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH);
+ });
+
+ // Protection against deleting WAL segments.
+ assertTrue(walMgr(n).reserve(new WALPointer(0, 0, 0)));
+
+ AtomicInteger key = new AtomicInteger();
+
+ fill(n, 3, key);
+ forceCheckpoint();
+
+ // Guaranteed recovery from WAL segments.
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ fill(n, 3, key);
+
+ stopAllGrids();
+
+ n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().setWalSegments(segments);
+ });
+
+ assertEquals(key.get(), n.cache(DEFAULT_CACHE_NAME).size());
+ }
+
+ /**
+ * Filling the cache until N WAL segments are created.
+ *
+ * @param n Node.
+ * @param segments Number of segments.
+ * @param key Key counter.
+ */
+ private void fill(IgniteEx n, int segments, AtomicInteger key) {
+ long end = walMgr(n).currentSegment() + segments;
+ int i = 0;
+
+ while (walMgr(n).currentSegment() < end) {
+ int k = key.getAndIncrement();
+ int[] arr = new int[64];
+
+ Arrays.fill(arr, k);
+
+ n.cache(DEFAULT_CACHE_NAME).put(key, arr);
+
+ i++;
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Fill [keys=" + i + ", totalKeys=" + key.get() +
+ ", segNum=" + segments + ", currSeg=" + walMgr(n).currentSegment() + ']');
+ }
+ }
+
+ /**
+ * Getting WAL manager of node.
+ *
+ * @param n Node.
+ * @return WAL manager.
+ */
+ private FileWriteAheadLogManager walMgr(IgniteEx n) {
+ return (FileWriteAheadLogManager)n.context().cache().context().wal();
+ }
+
+ /**
+ * Getting db manager of node.
+ *
+ * @param n Node.
+ * @return Db manager.
+ */
+ private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+ return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d286c126..a0a8830 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1038,6 +1038,22 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
* Starts new grid with given index.
*
* @param idx Index of the grid to start.
+ * @param cfgC Configuration mutator. Can be used to avoid oversimplification of {@link #getConfiguration()}.
+ * @return Started grid.
+ * @throws Exception If anything failed.
+ */
+ protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgC) throws Exception {
+ return startGrid(getTestIgniteInstanceName(idx), cfg -> {
+ cfgC.accept(cfg);
+
+ return cfg;
+ });
+ }
+
+ /**
+ * Starts new grid with given index.
+ *
+ * @param idx Index of the grid to start.
* @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
* @return Started grid.
* @throws Exception If anything failed.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index ad60059..75aaed9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWa
import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationPersistentTest;
@@ -137,6 +138,8 @@ public class IgnitePdsTestSuite {
GridTestUtils.addTestIfNeeded(suite, SegmentAwareTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, WalArchiveConsistencyTest.class, ignoredTests);
+
return suite;
}