You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2020/12/23 13:47:11 UTC
[ignite] branch master updated: IGNITE-13831 Move WAL archive
cleanup from checkpoint to rollover - Fixes #8563.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1888416 IGNITE-13831 Move WAL archive cleanup from checkpoint to rollover - Fixes #8563.
1888416 is described below
commit 1888416f563f67f3b786609a27951736d2948a5c
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Dec 23 16:43:35 2020 +0300
IGNITE-13831 Move WAL archive cleanup from checkpoint to rollover - Fixes #8563.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 5 -
.../GridCacheDatabaseSharedManager.java | 7 +-
.../cache/persistence/checkpoint/Checkpoint.java | 19 +-
.../persistence/checkpoint/CheckpointHistory.java | 106 +----
.../persistence/checkpoint/CheckpointManager.java | 7 +-
.../checkpoint/CheckpointMarkersStorage.java | 18 +-
.../cache/persistence/checkpoint/Checkpointer.java | 16 +-
.../persistence/wal/FileWriteAheadLogManager.java | 446 ++++++++++++++-------
.../wal/aware/SegmentArchiveSizeStorage.java | 104 +++++
.../wal/aware/SegmentArchivedStorage.java | 17 -
.../cache/persistence/wal/aware/SegmentAware.java | 98 ++++-
.../wal/aware/SegmentCompressStorage.java | 2 +-
.../persistence/wal/aware/SegmentLockStorage.java | 2 +-
.../persistence/wal/aware/SegmentObservable.java | 2 +-
.../wal/aware/SegmentReservationStorage.java | 67 +++-
.../wal/aware/SegmentTruncateStorage.java | 151 +++++++
.../apache/ignite/internal/util/IgniteUtils.java | 20 +
.../db/wal/WalDeletionArchiveAbstractTest.java | 120 ++++--
.../cache/persistence/pagemem/NoOpWALManager.java | 5 -
.../persistence/wal/aware/SegmentAwareTest.java | 95 +++++
.../ignite/internal/util/IgniteUtilsSelfTest.java | 32 ++
21 files changed, 989 insertions(+), 350 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index f3d85c5..a4ed599 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -186,11 +186,6 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
public long lastCompactedSegment();
/**
- * @return Max allowed index of archived segment to delete or -1 if it does not exist.
- */
- public long maxArchivedSegmentToDelete();
-
- /**
* Checks if WAL segment is under lock or reserved
*
* @param ptr Pointer to check.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 06e00f9..98ee778 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2744,11 +2744,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * Wal truncate callBack.
+ * Wal truncate callback.
*
- * @param highBound WALPointer.
+ * @param highBound Upper bound.
+ * @throws IgniteCheckedException If failed.
*/
- public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
+ public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointManager.removeCheckpointsUntil(highBound);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java
index e6a378b..bc842dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java
@@ -21,7 +21,6 @@ import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -34,26 +33,25 @@ class Checkpoint {
/** Checkpoint pages. */
final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages;
- /** */
+ /** Checkpoint progress status. */
final CheckpointProgressImpl progress;
- /** Number of deleted WAL files. */
- int walFilesDeleted;
-
/** WAL segments fully covered by this checkpoint. */
IgniteBiTuple<Long, Long> walSegsCoveredRange;
- /** */
+ /** Number of dirty pages. */
final int pagesSize;
/**
+ * Constructor.
+ *
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
Checkpoint(
@Nullable CheckpointEntry cpEntry,
- @NotNull GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
+ GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
CheckpointProgressImpl progress
) {
this.cpEntry = cpEntry;
@@ -71,13 +69,6 @@ class Checkpoint {
}
/**
- * @param walFilesDeleted Wal files deleted.
- */
- public void walFilesDeleted(int walFilesDeleted) {
- this.walFilesDeleted = walFilesDeleted;
- }
-
- /**
* @param walSegsCoveredRange WAL segments fully covered by this checkpoint.
*/
public void walSegsCoveredRange(final IgniteBiTuple<Long, Long> walSegsCoveredRange) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 869f0da..87be2eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -87,22 +87,26 @@ public class CheckpointHistory {
* Constructor.
*
* @param dsCfg Data storage configuration.
- * @param wal Write ahead log.
+ * @param logFun Function for getting a logger.
+ * @param wal WAL manager.
* @param inapplicable Checkpoint inapplicable filter.
*/
CheckpointHistory(
DataStorageConfiguration dsCfg,
- Function<Class<?>, IgniteLogger> logger,
+ Function<Class<?>, IgniteLogger> logFun,
IgniteWriteAheadLogManager wal,
IgniteThrowableBiPredicate<Long, Integer> inapplicable
) {
- this.log = logger.apply(getClass());
+ this.log = logFun.apply(getClass());
this.wal = wal;
this.checkpointInapplicable = inapplicable;
isWalTruncationEnabled = dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
- maxCpHistMemSize = IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE);
+ maxCpHistMemSize = IgniteSystemProperties.getInteger(
+ IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE,
+ DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE
+ );
reservationDisabled = dsCfg.getWalMode() == WALMode.NONE;
}
@@ -228,7 +232,7 @@ public class CheckpointHistory {
addCpGroupStatesToEarliestCpMap(entry, states);
}
catch (IgniteCheckedException ex) {
- U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
+ U.warn(log, "Failed to process checkpoint: " + entry, ex);
earliestCp.clear();
}
@@ -314,6 +318,7 @@ public class CheckpointHistory {
/**
* Clears checkpoint history after WAL truncation.
*
+ * @param highBound Upper bound.
* @return List of checkpoint entries removed from history.
*/
public List<CheckpointEntry> onWalTruncated(WALPointer highBound) {
@@ -391,100 +396,13 @@ public class CheckpointHistory {
/**
* Logs and clears checkpoint history after checkpoint finish.
*
+ * @param chp Finished checkpoint.
* @return List of checkpoints removed from history.
*/
public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp) {
chp.walSegsCoveredRange(calculateWalSegmentsCovered());
- int removeCount = isWalTruncationEnabled
- ? checkpointCountUntilDeleteByArchiveSize()
- : (histMap.size() - maxCpHistMemSize);
-
- if (removeCount <= 0)
- return Collections.emptyList();
-
- List<CheckpointEntry> deletedCheckpoints = removeCheckpoints(removeCount);
-
- if (isWalTruncationEnabled) {
- int deleted = wal.truncate(firstCheckpointPointer());
-
- chp.walFilesDeleted(deleted);
- }
-
- return deletedCheckpoints;
- }
-
- /**
- * @param first One of pointers to choose the newest.
- * @param second One of pointers to choose the newest.
- * @return The newest pointer from input ones.
- */
- private WALPointer newerPointer(WALPointer first, WALPointer second) {
- if (first == null)
- return second;
-
- if (second == null)
- return first;
-
- return first.index() > second.index() ? first : second;
- }
-
- /**
- * Calculate mark until delete by maximum checkpoint history memory size.
- *
- * @return Checkpoint mark until which checkpoints can be deleted(not including this pointer).
- */
- private WALPointer checkpointMarkUntilDeleteByMemorySize() {
- if (histMap.size() <= maxCpHistMemSize)
- return null;
-
- int calculatedCpHistSize = maxCpHistMemSize;
-
- for (Map.Entry<Long, CheckpointEntry> entry : histMap.entrySet()) {
- if (histMap.size() <= calculatedCpHistSize++)
- return entry.getValue().checkpointMark();
- }
-
- return lastCheckpoint().checkpointMark();
- }
-
- /**
- * Calculate count of checkpoints to delete by maximum allowed archive size.
- *
- * @return Checkpoint count to be deleted.
- */
- private int checkpointCountUntilDeleteByArchiveSize() {
- long absFileIdxToDel = wal.maxArchivedSegmentToDelete();
-
- if (absFileIdxToDel < 0)
- return 0;
-
- long fileUntilDel = absFileIdxToDel + 1;
-
- long checkpointFileIdx = absFileIdx(lastCheckpoint());
-
- int countToRemove = 0;
-
- for (CheckpointEntry cpEntry : histMap.values()) {
- long currFileIdx = absFileIdx(cpEntry);
-
- if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx)
- return countToRemove;
-
- countToRemove++;
- }
-
- return histMap.size() - 1;
- }
-
- /**
- * Retrieve absolute file index by checkpoint entry.
- *
- * @param pointer checkpoint entry for which need to calculate absolute file index.
- * @return absolute file index for given checkpoint entry.
- */
- private long absFileIdx(CheckpointEntry pointer) {
- return pointer.checkpointMark().index();
+ return removeCheckpoints(isWalTruncationEnabled ? 0 : histMap.size() - maxCpHistMemSize);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 7fe7002..072d3bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -300,11 +300,12 @@ public class CheckpointManager {
}
/**
- * Wal truncate callBack.
+ * Wal truncate callback.
*
- * @param highBound WALPointer.
+ * @param highBound Upper bound.
+ * @throws IgniteCheckedException If failed.
*/
- public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException {
+ public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointMarkersStorage.removeCheckpointsUntil(highBound);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
index b421a33..d601d53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java
@@ -141,24 +141,28 @@ public class CheckpointMarkersStorage {
}
/**
- * Wal truncate callBack.
+ * Wal truncate callback.
*
- * @param highBound WALPointer.
+ * @param highBound Upper bound.
+ * @throws IgniteCheckedException If failed.
*/
- public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException {
- List<CheckpointEntry> removedFromHistory = history().onWalTruncated(highBound);
+ public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException {
+ List<CheckpointEntry> rmvFromHist = history().onWalTruncated(highBound);
- for (CheckpointEntry cp : removedFromHistory)
+ for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}
/**
* Logs and clears checkpoint history after checkpoint finish.
+ *
+ * @param chp Finished checkpoint.
+ * @throws IgniteCheckedException If failed.
*/
public void onCheckpointFinished(Checkpoint chp) throws IgniteCheckedException {
- List<CheckpointEntry> removedFromHistory = history().onCheckpointFinished(chp);
+ List<CheckpointEntry> rmvFromHist = history().onCheckpointFinished(chp);
- for (CheckpointEntry cp : removedFromHistory)
+ for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index 6747e58..63440b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -456,16 +456,12 @@ public class Checkpointer extends GridWorker {
if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
if (log.isInfoEnabled()) {
- String walSegsCoveredMsg = chp.walSegsCoveredRange == null ? "" : prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);
-
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
- "walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
- "total=%dms]",
+ "walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, total=%dms]",
chp.cpEntry != null ? chp.cpEntry.checkpointId() : "",
chp.pagesSize,
chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "",
- chp.walFilesDeleted,
- walSegsCoveredMsg,
+ walRangeStr(chp.walSegsCoveredRange),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
@@ -591,9 +587,15 @@ public class Checkpointer extends GridWorker {
}
/**
+ * Creates a string of a range WAL segments.
+ *
+ * @param walRange Range of WAL segments.
* @return The message about how many WAL segments was between previous checkpoint and current one.
*/
- private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) {
+ private String walRangeStr(@Nullable IgniteBiTuple<Long, Long> walRange) {
+ if (walRange == null)
+ return "";
+
String res;
long startIdx = walRange.get1();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f22aed9..4b97487 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
@@ -257,7 +256,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private final long maxSegCountWithoutCheckpoint;
- /** Size of wal archive since which removing of old archive should be started */
+ /** Size of wal archive since which removing of old archive should be started. */
private final long allowedThresholdWalArchiveSize;
/** */
@@ -319,6 +318,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Decompressor. */
@Nullable private FileDecompressor decompressor;
+ /**
+ * Cleaner of segments from WAL archive when the maximum size is reached.
+ * Will not work if WAL archive size is {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ */
+ @Nullable private FileCleaner cleaner;
+
/** Current log segment handle. */
private volatile FileWriteHandle currHnd;
@@ -494,8 +499,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (isArchiverEnabled())
archiver = new FileArchiver(segmentAware, log);
- else
- archiver = null;
+
+ if (!walArchiveUnlimited())
+ cleaner = new FileCleaner(log);
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
@@ -534,11 +540,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- *
+ * Running workers of WAL archive.
*/
- private void startArchiverAndCompressor() {
+ private void startArchiveWorkers() {
segmentAware.reset();
+ segmentAware.resetWalArchiveSizes();
+ segmentAware.addCurrentWalArchiveSize(totalSize(walArchiveFiles()));
+
if (isArchiverEnabled()) {
assert archiver != null : "FileArchiver should be initialized.";
@@ -554,6 +563,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
decompressor.restart();
}
+
+ if (!walArchiveUnlimited()) {
+ assert cleaner != null : "FileCleaner should be initialized.";
+
+ cleaner.restart();
+ }
}
/**
@@ -654,6 +669,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (decompressor != null)
decompressor.shutdown();
+
+ if (cleaner != null)
+ cleaner.shutdown();
}
catch (IgniteInterruptedCheckedException e) {
U.error(log, "Failed to gracefully shutdown WAL components, thread was interrupted.", e);
@@ -703,7 +721,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert currHnd == null;
- startArchiverAndCompressor();
+ startArchiveWorkers();
assert (isArchiverEnabled() && archiver != null) || (!isArchiverEnabled() && archiver == null) :
"Trying to restore FileWriteHandle on deactivated write ahead log manager";
@@ -1051,6 +1069,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
|| !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment.
return deleted;
+ long len = desc.file.length();
+
if (!desc.file.delete()) {
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath());
@@ -1059,6 +1079,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
deleted++;
segmentSize.remove(desc.idx());
+ segmentAware.addCurrentWalArchiveSize(-len);
}
// Bump up the oldest archive segment index.
@@ -1087,6 +1108,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override public void notchLastCheckpointPtr(WALPointer ptr) {
lastCheckpointPtr = ptr;
+
+ segmentAware.lastCheckpointIdx(ptr.index());
}
/** {@inheritDoc} */
@@ -1261,24 +1284,38 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset());
}
+ if (archiver == null)
+ segmentAware.addReservedWalArchiveSize(maxWalSegmentSize);
+
FileWriteHandle next;
try {
- next = initNextWriteHandle(cur);
- }
- catch (IgniteCheckedException e) {
- //Allow to avoid forever waiting in other threads.
- cur.signalNextAvailable();
+ try {
+ next = initNextWriteHandle(cur);
+ }
+ catch (IgniteCheckedException e) {
+ //Allow to avoid forever waiting in other threads.
+ cur.signalNextAvailable();
- throw e;
- }
+ throw e;
+ }
- if (rec != null) {
- WALPointer ptr = next.addRecord(rec);
+ if (rec != null) {
+ WALPointer ptr = next.addRecord(rec);
- assert ptr != null;
+ assert ptr != null;
+ }
+
+ segmentSize.put(next.getSegmentId(), maxWalSegmentSize);
+
+ if (archiver == null)
+ segmentAware.addCurrentWalArchiveSize(maxWalSegmentSize);
+ }
+ finally {
+ if (archiver == null)
+ segmentAware.addReservedWalArchiveSize(-maxWalSegmentSize);
}
- if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint)
+ if (next.getSegmentId() - lastCheckpointPtr.index() >= maxSegCountWithoutCheckpoint)
cctx.database().forceCheckpoint("too big size of WAL without checkpoint");
boolean updated = updateCurrentHandle(next, hnd);
@@ -1298,15 +1335,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Give last checkpoint file idx.
- */
- private long lashCheckpointFileIdx() {
- WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer();
-
- return lastCheckpointMark == null ? 0 : lastCheckpointMark.index();
- }
-
- /**
* @param lastReadPtr Last read WAL file pointer.
* @return Initialized file write handle.
* @throws StorageException If failed to initialize WAL write handle.
@@ -1358,6 +1386,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
FileDescriptor[] walArchiveFiles = walArchiveFiles();
segmentAware.minReserveIndex(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1);
+ segmentAware.lastTruncatedArchiveIdx(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1);
if (archiver0 == null)
segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1);
@@ -1627,37 +1656,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*
* @return Raw or compressed WAL segments from archive.
*/
- private FileDescriptor[] walArchiveFiles() {
+ public FileDescriptor[] walArchiveFiles() {
return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
}
- /** {@inheritDoc} */
- @Override public long maxArchivedSegmentToDelete() {
- //When maxWalArchiveSize==-1 deleting files is not permitted.
- if (dsCfg.getMaxWalArchiveSize() == DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE)
- return -1;
-
- FileDescriptor[] archivedFiles = walArchiveFiles();
-
- Long totalArchiveSize = Stream.of(archivedFiles)
- .map(desc -> desc.file().length())
- .reduce(0L, Long::sum);
-
- if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize)
- return -1;
-
- long sizeOfOldestArchivedFiles = 0;
-
- for (FileDescriptor desc : archivedFiles) {
- sizeOfOldestArchivedFiles += desc.file().length();
-
- if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize)
- return desc.getIdx();
- }
-
- return archivedFiles[archivedFiles.length - 1].getIdx();
- }
-
/**
* @return Sorted WAL files descriptors.
*/
@@ -1997,24 +1999,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']');
}
- try {
- Files.deleteIfExists(dstTmpFile.toPath());
-
- boolean copied = false;
-
- long offs = switchSegmentRecordOffset.get((int)segIdx);
-
- if (offs > 0) {
- switchSegmentRecordOffset.set((int)segIdx, 0);
+ assert switchSegmentRecordOffset != null;
- if (offs < origFile.length()) {
- GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs);
+ long offs = switchSegmentRecordOffset.getAndSet((int)segIdx, 0);
+ long origLen = origFile.length();
- copied = true;
- }
- }
+ long reservedSize = offs > 0 && offs < origLen ? offs : origLen;
+ segmentAware.addReservedWalArchiveSize(reservedSize);
- if (!copied)
+ try {
+ if (offs > 0 && offs < origLen)
+ GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs);
+ else
Files.copy(origFile.toPath(), dstTmpFile.toPath());
Files.move(dstTmpFile.toPath(), dstFile.toPath());
@@ -2026,12 +2022,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
segmentSize.put(absIdx, dstFile.length());
+ segmentAware.addCurrentWalArchiveSize(dstFile.length());
}
catch (IOException e) {
+ deleteArchiveFiles(dstFile, dstTmpFile);
+
throw new StorageException("Failed to archive WAL segment [" +
"srcFile=" + origFile.getAbsolutePath() +
", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
}
+ finally {
+ segmentAware.addReservedWalArchiveSize(-reservedSize);
+ }
if (log.isInfoEnabled()) {
log.info("Copied file [src=" + origFile.getAbsolutePath() +
@@ -2217,8 +2219,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
continue;
- deleteObsoleteRawSegments();
-
String segmentFileName = fileName(segIdx);
File tmpZip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX + TMP_SUFFIX);
@@ -2227,33 +2227,48 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
File raw = new File(walArchiveDir, segmentFileName);
- if (!Files.exists(raw.toPath()))
- throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
+ long reservedSize = raw.length();
+ segmentAware.addReservedWalArchiveSize(reservedSize);
- compressSegmentToFile(segIdx, raw, tmpZip);
+ try {
+ deleteObsoleteRawSegments();
- Files.move(tmpZip.toPath(), zip.toPath());
+ if (!Files.exists(raw.toPath()))
+ throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
- try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
- f0.force();
+ compressSegmentToFile(segIdx, raw, tmpZip);
+
+ Files.move(tmpZip.toPath(), zip.toPath());
+
+ try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
+ f0.force();
+ }
+
+ segmentSize.put(segIdx, zip.length());
+ segmentAware.addCurrentWalArchiveSize(zip.length());
+
+ segmentAware.onSegmentCompressed(segIdx);
+
+ if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode())
+ evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile()));
}
+ catch (IgniteCheckedException | IOException e) {
+ deleteArchiveFiles(zip, tmpZip);
- segmentAware.onSegmentCompressed(segIdx);
+ lastCompressionError = e;
- if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode())
- evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile()));
+ U.error(log, "Compression of WAL segment [idx=" + segIdx +
+ "] was skipped due to unexpected error", lastCompressionError);
+
+ segmentAware.onSegmentCompressed(segIdx);
+ }
+ finally {
+ segmentAware.addReservedWalArchiveSize(-reservedSize);
+ }
}
catch (IgniteInterruptedCheckedException ignore) {
Thread.currentThread().interrupt();
}
- catch (IgniteCheckedException | IOException e) {
- lastCompressionError = e;
-
- U.error(log, "Compression of WAL segment [idx=" + segIdx +
- "] was skipped due to unexpected error", lastCompressionError);
-
- segmentAware.onSegmentCompressed(segIdx);
- }
finally {
if (segIdx != -1L)
release(new WALPointer(segIdx, 0, 0));
@@ -2313,8 +2328,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
zos.write(heapBuf.array());
}
-
- segmentSize.put(idx, zip.length());
}
/**
@@ -2342,7 +2355,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves.
*/
private void deleteObsoleteRawSegments() {
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+ FileDescriptor[] descs = walArchiveFiles();
Set<Long> indices = new HashSet<>();
Set<Long> duplicateIndices = new HashSet<>();
@@ -2360,13 +2373,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (segmentReservedOrLocked(desc.idx))
return;
- if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) {
- if (desc.file.exists() && !desc.file.delete()) {
- U.warn(log, "Failed to remove obsolete WAL segment " +
- "(make sure the process has enough rights): " + desc.file.getAbsolutePath() +
- ", exists: " + desc.file.exists());
- }
- }
+ if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
+ segmentAware.addCurrentWalArchiveSize(-deleteArchiveFiles(desc.file));
}
}
}
@@ -2400,24 +2408,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
while (!isCancelled()) {
long segmentToDecompress = -1L;
+ blockingSectionBegin();
+
try {
- blockingSectionBegin();
+ segmentToDecompress = segmentsQueue.take();
+ }
+ finally {
+ blockingSectionEnd();
+ }
- try {
- segmentToDecompress = segmentsQueue.take();
- }
- finally {
- blockingSectionEnd();
- }
+ if (isCancelled())
+ break;
- if (isCancelled())
- break;
+ if (segmentToDecompress == -1)
+ continue;
+
+ String segmentFileName = fileName(segmentToDecompress);
+
+ File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
+ File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX);
+ File unzip = new File(walArchiveDir, segmentFileName);
+
+ long reservedSize = U.uncompressedSize(zip);
+ segmentAware.addReservedWalArchiveSize(reservedSize);
- String segmentFileName = fileName(segmentToDecompress);
+ IgniteCheckedException ex = null;
- File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
- File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX);
- File unzip = new File(walArchiveDir, segmentFileName);
+ try {
+ if (unzip.exists())
+ throw new FileAlreadyExistsException(unzip.getAbsolutePath());
try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
FileIO io = ioFactory.create(unzipTmp)) {
@@ -2427,32 +2446,30 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
updateHeartbeat();
}
- try {
- Files.move(unzipTmp.toPath(), unzip.toPath());
- }
- catch (FileAlreadyExistsException e) {
+ Files.move(unzipTmp.toPath(), unzip.toPath());
+
+ segmentAware.addCurrentWalArchiveSize(unzip.length());
+ }
+ catch (IOException e) {
+ deleteArchiveFiles(unzipTmp);
+
+ if (e instanceof FileAlreadyExistsException) {
U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
"[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
-
- if (!unzipTmp.delete())
- U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]");
}
-
- updateHeartbeat();
-
- synchronized (this) {
- decompressionFutures.remove(segmentToDecompress).onDone();
+ else if (!isCancelled) {
+ ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" +
+ segmentToDecompress + "]", e);
}
}
- catch (IOException ex) {
- if (!isCancelled && segmentToDecompress != -1L) {
- IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " +
- "decompression [segmentIdx=" + segmentToDecompress + "]", ex);
+ finally {
+ segmentAware.addReservedWalArchiveSize(-reservedSize);
+ }
- synchronized (this) {
- decompressionFutures.remove(segmentToDecompress).onDone(e);
- }
- }
+ updateHeartbeat();
+
+ synchronized (this) {
+ decompressionFutures.remove(segmentToDecompress).onDone(ex);
}
}
}
@@ -3062,8 +3079,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
else
res = CURR_HND_UPD.compareAndSet(this, c, n);
- segmentSize.put(n.getSegmentId(), maxWalSegmentSize);
-
return res;
}
@@ -3077,4 +3092,167 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return name != null && (WAL_NAME_PATTERN.matcher(name).matches() ||
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(name).matches());
}
+
+ /**
+ * Getting last truncated segment.
+ *
+ * @return Absolut segment index.
+ */
+ public long lastTruncatedSegment() {
+ return segmentAware.lastTruncatedArchiveIdx();
+ }
+
+ /**
+ * Total size of the segments in bytes.
+ *
+ * @return Size in bytes.
+ */
+ public long totalSize(FileDescriptor... fileDescriptors) {
+ long len = 0;
+
+ for (FileDescriptor descriptor : fileDescriptors)
+ len += descriptor.file.length();
+
+ return len;
+ }
+
+ /**
+ * Check if WAL archive is unlimited.
+ *
+ * @return {@code True} if unlimited.
+ */
+ private boolean walArchiveUnlimited() {
+ return dsCfg.getMaxWalArchiveSize() == DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+ }
+
+ /**
+ * Removing files from {@link #walArchiveDir}.
+ *
+ * @param files Files from {@link #walArchiveDir}.
+ * @return Total deleted size in bytes.
+ */
+ private long deleteArchiveFiles(File... files) {
+ long size = 0;
+
+ for (File file : files) {
+ if (file.exists()) {
+ long len = file.length();
+
+ if (file.delete())
+ size += len;
+ else if (file.exists()) {
+ U.warn(log, "Unable to delete file from WAL archive" +
+ " (make sure the process has enough rights): " + file.getAbsolutePath());
+ }
+ }
+ }
+
+ return size;
+ }
+
+ /**
+ * Worker for an asynchronous WAL archive cleanup that starts when the maximum size is exceeded.
+ * {@link SegmentAware#awaitExceedMaxArchiveSize} is used to determine if the maximum is exceeded.
+ */
+ private class FileCleaner extends GridWorker {
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ */
+ public FileCleaner(IgniteLogger log) {
+ super(cctx.igniteInstanceName(), "wal-file-cleaner%" + cctx.igniteInstanceName(), log);
+
+ assert !walArchiveUnlimited();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ Throwable err = null;
+
+ try {
+ while (!isCancelled()) {
+ segmentAware.awaitExceedMaxArchiveSize(allowedThresholdWalArchiveSize);
+ segmentAware.awaitAvailableTruncateArchive();
+
+ FileDescriptor[] walArchiveFiles = walArchiveFiles();
+
+ FileDescriptor high = null;
+
+ long size = 0;
+
+ for (FileDescriptor fileDesc : walArchiveFiles) {
+ if (fileDesc.idx >= lastCheckpointPtr.index() || segmentAware.reserved(fileDesc.idx))
+ break;
+ else {
+ high = fileDesc;
+
+ // Ensure that there will be exactly removed at least one segment.
+ if ((size += fileDesc.file.length()) > allowedThresholdWalArchiveSize)
+ break;
+ }
+ }
+
+ if (high != null) {
+ WALPointer highPtr = new WALPointer(high.idx + 1, 0, 0);
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting to clean WAL archive [highIdx=" + highPtr.index()
+ + ", currSize=" + U.humanReadableByteCount(totalSize(walArchiveFiles))
+ + ", maxSize=" + U.humanReadableByteCount(dsCfg.getMaxWalArchiveSize()) + ']');
+ }
+
+ ((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr);
+
+ int truncated = truncate(highPtr);
+
+ if (log.isInfoEnabled()) {
+ log.info("Finish clean WAL archive [cleanCnt=" + truncated
+ + ", currSize=" + U.humanReadableByteCount(totalSize(walArchiveFiles()))
+ + ", maxSize=" + U.humanReadableByteCount(dsCfg.getMaxWalArchiveSize()) + ']');
+ }
+ }
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ Thread.currentThread().interrupt();
+
+ isCancelled = true;
+ }
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !isCancelled())
+ err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ failureProcessor.process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
+ }
+
+ /**
+ * Shutdown worker.
+ *
+ * @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown.
+ */
+ private void shutdown() throws IgniteInterruptedCheckedException {
+ isCancelled = true;
+
+ U.join(this);
+ }
+
+ /**
+ * Restart worker in IgniteThread.
+ */
+ public void restart() {
+ assert runner() == null : "FileCleaner is still running";
+
+ isCancelled = false;
+
+ new IgniteThread(this).start();
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java
new file mode 100644
index 0000000..76d6022
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Storage WAL archive size.
+ * Allows to track the exceeding of the maximum archive size.
+ */
+class SegmentArchiveSizeStorage {
+ /** Current WAL archive size in bytes. */
+ private long curr;
+
+ /** Reserved WAL archive size in bytes. */
+ private long reserved;
+
+ /** Flag of interrupt waiting on this object. */
+ private volatile boolean interrupted;
+
+ /**
+ * Adding current WAL archive size in bytes.
+ *
+ * @param size Size in bytes.
+ */
+ synchronized void addCurrentSize(long size) {
+ curr += size;
+
+ if (size > 0)
+ notifyAll();
+ }
+
+ /**
+ * Adding reserved WAL archive size in bytes.
+ * Defines a hint to determine if the maximum size is exceeded before a new segment is archived.
+ *
+ * @param size Size in bytes.
+ */
+ synchronized void addReservedSize(long size) {
+ reserved += size;
+
+ if (size > 0)
+ notifyAll();
+ }
+
+ /**
+ * Reset the current and reserved WAL archive sizes.
+ */
+ synchronized void resetSizes() {
+ curr = 0;
+ reserved = 0;
+ }
+
+ /**
+ * Waiting for exceeding the maximum WAL archive size.
+ * To track size of WAL archive, need to use {@link #addCurrentSize} and {@link #addReservedSize}.
+ *
+ * @param max Maximum WAL archive size in bytes.
+ * @throws IgniteInterruptedCheckedException If it was interrupted.
+ */
+ synchronized void awaitExceedMaxSize(long max) throws IgniteInterruptedCheckedException {
+ try {
+ while (max - (curr + reserved) > 0 && !interrupted)
+ wait();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+
+ if (interrupted)
+ throw new IgniteInterruptedCheckedException("Interrupt waiting of exceed max archive size");
+ }
+
+ /**
+ * Interrupt waiting on this object.
+ */
+ synchronized void interrupt() {
+ interrupted = true;
+
+ notifyAll();
+ }
+
+ /**
+ * Reset interrupted flag.
+ */
+ void reset() {
+ interrupted = false;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 53b3b59..12a1e5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -37,9 +37,6 @@ class SegmentArchivedStorage extends SegmentObservable {
*/
private volatile long lastAbsArchivedIdx = -1;
- /** Latest truncated segment. */
- private volatile long lastTruncatedArchiveIdx = -1;
-
/**
* @param segmentLockStorage Protects WAL work segments from moving.
*/
@@ -137,18 +134,4 @@ class SegmentArchivedStorage extends SegmentObservable {
synchronized void onSegmentUnlocked(long segmentId) {
notifyAll();
}
-
- /**
- * @param lastTruncatedArchiveIdx Last truncated segment.
- */
- void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
- this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
- }
-
- /**
- * @return Last truncated segment.
- */
- long lastTruncatedArchiveIdx() {
- return lastTruncatedArchiveIdx;
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 8d676c9..48b16b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -39,6 +39,12 @@ public class SegmentAware {
/** Storage of absolute current segment index. */
private final SegmentCurrentStateStorage segmentCurrStateStorage;
+ /** Storage of archive size. */
+ private final SegmentArchiveSizeStorage archiveSizeStorage;
+
+ /** Storage of truncated segments. */
+ private final SegmentTruncateStorage truncateStorage;
+
/**
* Constructor.
*
@@ -52,10 +58,16 @@ public class SegmentAware {
segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt);
segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled);
+ archiveSizeStorage = new SegmentArchiveSizeStorage();
+ truncateStorage = new SegmentTruncateStorage();
+
segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived);
segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived);
+ segmentArchivedStorage.addObserver(truncateStorage::lastArchivedIdx);
segmentLockStorage.addObserver(segmentArchivedStorage::onSegmentUnlocked);
+
+ reservationStorage.addObserver(truncateStorage::minReservedIdx);
}
/**
@@ -149,17 +161,21 @@ public class SegmentAware {
}
/**
- * @param lastTruncatedArchiveIdx Last truncated segment;
+ * Update last truncated segment.
+ *
+ * @param absIdx Absolut segment index.
*/
- public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
- segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx);
+ public void lastTruncatedArchiveIdx(long absIdx) {
+ truncateStorage.lastTruncatedIdx(absIdx);
}
/**
- * @return Last truncated segment.
+ * Getting last truncated segment.
+ *
+ * @return Absolut segment index.
*/
public long lastTruncatedArchiveIdx() {
- return segmentArchivedStorage.lastTruncatedArchiveIdx();
+ return truncateStorage.lastTruncatedIdx();
}
/**
@@ -241,6 +257,10 @@ public class SegmentAware {
segmentCompressStorage.reset();
segmentCurrStateStorage.reset();
+
+ archiveSizeStorage.reset();
+
+ truncateStorage.reset();
}
/**
@@ -252,6 +272,10 @@ public class SegmentAware {
segmentCompressStorage.interrupt();
segmentCurrStateStorage.interrupt();
+
+ archiveSizeStorage.interrupt();
+
+ truncateStorage.interrupt();
}
/**
@@ -263,6 +287,10 @@ public class SegmentAware {
segmentCompressStorage.interrupt();
segmentCurrStateStorage.forceInterrupt();
+
+ archiveSizeStorage.interrupt();
+
+ truncateStorage.interrupt();
}
/**
@@ -288,4 +316,64 @@ public class SegmentAware {
public boolean minLockIndex(long absIdx) {
return segmentLockStorage.minLockIndex(absIdx);
}
+
+ /**
+ * Adding current WAL archive size in bytes.
+ *
+ * @param size Size in bytes.
+ */
+ public void addCurrentWalArchiveSize(long size) {
+ archiveSizeStorage.addCurrentSize(size);
+ }
+
+ /**
+ * Adding reserved WAL archive size in bytes.
+ * Defines a hint to determine if the maximum size is exceeded before a new segment is archived.
+ *
+ * @param size Size in bytes.
+ */
+ public void addReservedWalArchiveSize(long size) {
+ archiveSizeStorage.addReservedSize(size);
+ }
+
+ /**
+ * Reset the current and reserved WAL archive sizes.
+ */
+ public void resetWalArchiveSizes() {
+ archiveSizeStorage.resetSizes();
+ }
+
+ /**
+ * Waiting for exceeding the maximum WAL archive size. To track size of WAL archive,
+ * need to use {@link #addCurrentWalArchiveSize} and {@link #addReservedWalArchiveSize}.
+ *
+ * @param max Maximum WAL archive size in bytes.
+ * @throws IgniteInterruptedCheckedException If it was interrupted.
+ */
+ public void awaitExceedMaxArchiveSize(long max) throws IgniteInterruptedCheckedException {
+ archiveSizeStorage.awaitExceedMaxSize(max);
+ }
+
+ /**
+ * Update segment of last completed checkpoint.
+ * Required for binary recovery.
+ *
+ * @param absIdx Absolut segment index.
+ */
+ public void lastCheckpointIdx(long absIdx) {
+ truncateStorage.lastCheckpointIdx(absIdx);
+ }
+
+ /**
+ * Waiting for segment truncation to be available. To get the number of segments available for truncation, use
+ * {@link #lastTruncatedArchiveIdx}, {@link #lastCheckpointIdx}, {@link #reserve} and
+ * {@link #lastArchivedAbsoluteIndex} (to restart the node correctly) and is calculated as
+ * {@code lastTruncatedArchiveIdx} - {@code min(lastCheckpointIdx, reserve, lastArchivedAbsoluteIndex)}.
+ *
+ * @return Number of segments available to truncate.
+ * @throws IgniteInterruptedCheckedException If it was interrupted.
+ */
+ public long awaitAvailableTruncateArchive() throws IgniteInterruptedCheckedException {
+ return truncateStorage.awaitAvailableTruncate();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 5fca1ec..f71dd79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
/**
* Storage of actual information about current index of compressed segments.
*/
-public class SegmentCompressStorage {
+class SegmentCompressStorage {
/** Logger. */
private final IgniteLogger log;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
index a5a7948..189559ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
/**
* Lock on segment protects from archiving segment.
*/
-public class SegmentLockStorage extends SegmentObservable {
+class SegmentLockStorage extends SegmentObservable {
/**
* Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from
* {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
index 3e91504..f6cd9b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
@@ -24,7 +24,7 @@ import java.util.function.Consumer;
/**
* Implementation of observer-observable pattern. For handling specific changes of segment.
*/
-public abstract class SegmentObservable {
+abstract class SegmentObservable {
/** Observers for handle changes of archived index. */
private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index 42eece7..453cc17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -16,13 +16,17 @@
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+import java.util.Map;
import java.util.NavigableMap;
+import java.util.Objects;
import java.util.TreeMap;
+import java.util.function.Consumer;
+import org.jetbrains.annotations.Nullable;
/**
* Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup.
*/
-class SegmentReservationStorage {
+class SegmentReservationStorage extends SegmentObservable {
/**
* Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has
* index >= reserved segment index. Guarded by {@code this}.
@@ -38,14 +42,22 @@ class SegmentReservationStorage {
* @param absIdx Index for reservation.
* @return {@code True} if the reservation was successful.
*/
- synchronized boolean reserve(long absIdx) {
- if (absIdx > minReserveIdx) {
- reserved.merge(absIdx, 1, Integer::sum);
+ boolean reserve(long absIdx) {
+ boolean res = false;
+ Long minReservedIdx = null;
- return true;
+ synchronized (this) {
+ if (absIdx > minReserveIdx) {
+ minReservedIdx = trackingMinReservedIdx(reserved -> reserved.merge(absIdx, 1, Integer::sum));
+
+ res = true;
+ }
}
- return false;
+ if (minReservedIdx != null)
+ notifyObservers(minReservedIdx);
+
+ return res;
}
/**
@@ -61,15 +73,24 @@ class SegmentReservationStorage {
/**
* @param absIdx Reserved index.
*/
- synchronized void release(long absIdx) {
- Integer cur = reserved.get(absIdx);
+ void release(long absIdx) {
+ Long minReservedIdx;
- assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+ synchronized (this) {
+ minReservedIdx = trackingMinReservedIdx(reserved -> {
+ Integer cur = reserved.get(absIdx);
- if (cur == 1)
- reserved.remove(absIdx);
- else
- reserved.put(absIdx, cur - 1);
+ assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+
+ if (cur == 1)
+ reserved.remove(absIdx);
+ else
+ reserved.put(absIdx, cur - 1);
+ });
+ }
+
+ if (minReservedIdx != null)
+ notifyObservers(minReservedIdx);
}
/**
@@ -88,4 +109,24 @@ class SegmentReservationStorage {
return true;
}
+
+ /**
+ * Updating {@link #reserved} with tracking changes of minimum reserved segment.
+ *
+ * @param updateFun {@link #reserved} update function.
+ * @return New minimum reserved segment, {@code null} if there are no changes,
+ * {@code -1} if there are no reserved segments.
+ */
+ @Nullable private synchronized Long trackingMinReservedIdx(Consumer<NavigableMap<Long, Integer>> updateFun) {
+ Map.Entry<Long, Integer> oldMinE = reserved.firstEntry();
+
+ updateFun.accept(reserved);
+
+ Map.Entry<Long, Integer> newMinE = reserved.firstEntry();
+
+ Long oldMin = oldMinE == null ? null : oldMinE.getKey();
+ Long newMin = newMinE == null ? null : newMinE.getKey();
+
+ return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java
new file mode 100644
index 0000000..a65ef8a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Store the last truncated segment and allows to get the number of segments available for truncation.
+ * We cannot truncate the segments required for {@link #lastCpIdx binary recovery}, {@link #minReservedIdx reserved}
+ * and {@link #lastArchivedIdx last archived} (to restart the node correctly). Thus, we need to take account of these
+ * conditions in the calculation of the number of segments available for truncation.
+ */
+class SegmentTruncateStorage {
+ /** Flag of interrupt waiting on this object. */
+ private volatile boolean interrupted;
+
+ /** Latest truncated segment. */
+ private long lastTruncatedIdx = -1;
+
+ /** Minimum reserved segment. */
+ private long minReservedIdx = -1;
+
+ /** Segment of last completed checkpoint. */
+ private long lastCpIdx = -1;
+
+ /** Last archived segment. */
+ private long lastArchivedIdx = -1;
+
+ /**
+ * Update last truncated segment.
+ *
+ * @param absIdx Absolut segment index.
+ */
+ synchronized void lastTruncatedIdx(long absIdx) {
+ lastTruncatedIdx = absIdx;
+
+ notifyAll();
+ }
+
+ /**
+ * Update minimum reserved segment.
+ * Protected from deletion.
+ *
+ * @param absIdx Absolut segment index.
+ */
+ synchronized void minReservedIdx(long absIdx) {
+ minReservedIdx = absIdx;
+
+ notifyAll();
+ }
+
+ /**
+ * Update segment of last completed checkpoint.
+ * Required for binary recovery.
+ *
+ * @param absIdx Absolut segment index.
+ */
+ synchronized void lastCheckpointIdx(long absIdx) {
+ lastCpIdx = absIdx;
+
+ notifyAll();
+ }
+
+ /**
+ * Update last archived segment.
+ * Needed to restart the node correctly.
+ *
+ * @param absIdx Absolut segment index.
+ */
+ synchronized void lastArchivedIdx(long absIdx) {
+ lastArchivedIdx = absIdx;
+
+ notifyAll();
+ }
+
+ /**
+ * Getting last truncated segment.
+ *
+ * @return Absolut segment index.
+ */
+ synchronized long lastTruncatedIdx() {
+ return lastTruncatedIdx;
+ }
+
+ /**
+ * Waiting for segment truncation to be available. Use {@link #lastTruncatedIdx}, {@link #lastCpIdx},
+ * {@link #minReservedIdx} and {@link #lastArchivedIdx} to determine the number of segments to truncate.
+ *
+ * @return Number of segments available to truncate.
+ * @throws IgniteInterruptedCheckedException If it was interrupted.
+ */
+ synchronized long awaitAvailableTruncate() throws IgniteInterruptedCheckedException {
+ try {
+ while (availableTruncateCnt() == 0 && !interrupted)
+ wait();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+
+ if (interrupted)
+ throw new IgniteInterruptedCheckedException("Interrupt waiting for truncation availability");
+
+ return availableTruncateCnt();
+ }
+
+ /**
+ * Interrupt waiting on this object.
+ */
+ synchronized void interrupt() {
+ interrupted = true;
+
+ notifyAll();
+ }
+
+ /**
+ * Resets interrupted flag.
+ */
+ void reset() {
+ interrupted = false;
+ }
+
+ /**
+ * Calculation the number of segments that can be truncated.
+ *
+ * @return Number of segments.
+ */
+ private synchronized long availableTruncateCnt() {
+ long highIdx = minReservedIdx == -1 ? lastCpIdx : Math.min(minReservedIdx, lastCpIdx);
+
+ // Protection against deleting the last segment from WAL archive for correct restart the node.
+ highIdx = lastArchivedIdx == -1 ? highIdx : Math.min(lastArchivedIdx, highIdx);
+
+ return Math.max(0, highIdx == -1 ? 0 : highIdx - (lastTruncatedIdx + 1));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3cf5c38..5f5aafe 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -12028,4 +12028,24 @@ public abstract class IgniteUtils {
return sb.toString();
}
+
+ /**
+ * Getting the total size of uncompressed data in zip.
+ *
+ * @param zip Zip file.
+ * @return Total uncompressed size.
+ * @throws IOException If failed.
+ */
+ public static long uncompressedSize(File zip) throws IOException {
+ try (ZipFile zipFile = new ZipFile(zip)) {
+ long size = 0;
+
+ Enumeration<? extends ZipEntry> entries = zipFile.entries();
+
+ while (entries.hasMoreElements())
+ size += entries.nextElement().getSize();
+
+ return size;
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
index 9f3eaaf..e66abe3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
@@ -17,29 +17,33 @@
package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+import java.io.File;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
@@ -66,14 +70,14 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
Ignite ignite = startGrid(configuration);
- ignite.active(true);
+ ignite.cluster().state(ClusterState.ACTIVE);
return ignite;
}
/** */
- private CacheConfiguration<Integer, Integer> cacheConfiguration() {
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+ private CacheConfiguration<Integer, Object> cacheConfiguration() {
+ CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
return ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
}
@@ -135,37 +139,36 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
public void testCorrectDeletedArchivedWalFiles() throws Exception {
//given: configured grid with setted max wal archive size
long maxWalArchiveSize = 2 * 1024 * 1024;
- Ignite ignite = startGrid(dbCfg -> {
- dbCfg.setMaxWalArchiveSize(maxWalArchiveSize);
- });
+ Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize));
GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);
- long allowedThresholdWalArchiveSize = maxWalArchiveSize / 2;
+ CheckpointHistory hist = dbMgr.checkpointHistory();
+ assertNotNull(hist);
- IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration());
+ IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration());
//when: put to cache more than 2 MB
- for (int i = 0; i < 500; i++)
+ for (int i = 0; i < 500; i++) {
+ if (i % 100 == 0)
+ forceCheckpoint();
+
cache.put(i, i);
+ }
- forceCheckpoint();
+ //then: total archive size less than of maxWalArchiveSize(by current logic)
+ FileWriteAheadLogManager wal = wal(ignite);
- //then: total archive size less than half of maxWalArchiveSize(by current logic)
- IgniteWriteAheadLogManager wal = wal(ignite);
+ assertTrue(waitForCondition(() -> wal.lastTruncatedSegment() >= 0, 10_000));
- FileDescriptor[] files = (FileDescriptor[])U.findNonPublicMethod(wal.getClass(), "walArchiveFiles").invoke(wal);
+ FileDescriptor[] files = wal.walArchiveFiles();
- Long totalSize = Stream.of(files)
- .map(desc -> desc.file().length())
- .reduce(0L, Long::sum);
+ long totalSize = wal.totalSize(files);
assertTrue(files.length >= 1);
- assertTrue(totalSize <= allowedThresholdWalArchiveSize);
+ assertTrue(totalSize < maxWalArchiveSize);
assertFalse(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("00001.wal")));
- CheckpointHistory hist = dbMgr.checkpointHistory();
-
assertTrue(!hist.checkpoints().isEmpty());
}
@@ -175,13 +178,11 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
@Test
public void testCheckpointStarted_WhenWalHasTooBigSizeWithoutCheckpoint() throws Exception {
//given: configured grid with max wal archive size = 1MB, wal segment size = 512KB
- Ignite ignite = startGrid(dbCfg -> {
- dbCfg.setMaxWalArchiveSize(1 * 1024 * 1024);// 1 Mbytes
- });
+ Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(1024 * 1024));
GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);
- IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration());
+ IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration());
for (int i = 0; i < 500; i++)
cache.put(i, i);
@@ -195,20 +196,63 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
}
/**
- * Correct delete checkpoint history from memory depends on IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE. WAL files
- * doesn't delete because deleting was disabled.
+ * Test for check deprecated removing checkpoint by deprecated walHistorySize parameter
+ *
+ * @deprecated Test old removing process depends on WalHistorySize.
+ */
+ @Test
+ public void testCheckpointHistoryRemovingByTruncate() throws Exception {
+ Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(2 * 1024 * 1024));
+
+ GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);
+
+ IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration());
+
+ CheckpointHistory hist = dbMgr.checkpointHistory();
+ assertNotNull(hist);
+
+ int startHistSize = hist.checkpoints().size();
+
+ int checkpointCnt = 10;
+
+ for (int i = 0; i < checkpointCnt; i++) {
+ cache.put(i, i);
+ //and: wait for checkpoint finished
+ forceCheckpoint();
+ // Check that the history is growing.
+ assertEquals(startHistSize + (i + 1), hist.checkpoints().size());
+ }
+
+ // Ensure rollover and wal archive cleaning.
+ for (int i = 0; i < 6; i++)
+ cache.put(i, new byte[ignite.configuration().getDataStorageConfiguration().getWalSegmentSize() / 2]);
+
+ FileWriteAheadLogManager wal = wal(ignite);
+ assertTrue(waitForCondition(() -> wal.lastTruncatedSegment() >= 0, 10_000));
+
+ assertTrue(hist.checkpoints().size() < checkpointCnt + startHistSize);
+
+ File[] cpFiles = dbMgr.checkpointDirectory().listFiles();
+
+ assertTrue(cpFiles.length <= (checkpointCnt * 2 + 1));// starts & ends + node_start
+ }
+
+ /**
+ * Correct delete checkpoint history from memory depends on IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE.
+ * WAL files doesn't delete because deleting was disabled.
*/
@Test
@WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2")
public void testCorrectDeletedCheckpointHistoryButKeepWalFiles() throws Exception {
//given: configured grid with disabled WAL removing.
- Ignite ignite = startGrid(dbCfg -> {
- dbCfg.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE);
- });
+ Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE));
GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);
- IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration());
+ CheckpointHistory hist = dbMgr.checkpointHistory();
+ assertNotNull(hist);
+
+ IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration());
//when: put to cache
for (int i = 0; i < 500; i++) {
@@ -221,16 +265,12 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
forceCheckpoint();
//then: WAL files was not deleted but some of checkpoint history was deleted.
- IgniteWriteAheadLogManager wal = wal(ignite);
-
- FileDescriptor[] files = (FileDescriptor[])U.findNonPublicMethod(wal.getClass(), "walArchiveFiles").invoke(wal);
+ FileWriteAheadLogManager wal = wal(ignite);
+ assertNull(getFieldValueHierarchy(wal, "cleaner"));
- boolean hasFirstSegment = Stream.of(files)
- .anyMatch(desc -> desc.file().getName().endsWith("0001.wal"));
+ FileDescriptor[] files = wal.walArchiveFiles();
- assertTrue(hasFirstSegment);
-
- CheckpointHistory hist = dbMgr.checkpointHistory();
+ assertTrue(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("0001.wal")));
assertTrue(hist.checkpoints().size() == 2);
}
@@ -245,7 +285,7 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
/**
* Extract IgniteWriteAheadLogManager.
*/
- private IgniteWriteAheadLogManager wal(Ignite ignite) {
- return ((IgniteEx)ignite).context().cache().context().wal();
+ private FileWriteAheadLogManager wal(Ignite ignite) {
+ return (FileWriteAheadLogManager)((IgniteEx)ignite).context().cache().context().wal();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 10634cc..035f289 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -185,11 +185,6 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
- @Override public long maxArchivedSegmentToDelete() {
- return -1;
- }
-
- /** {@inheritDoc} */
@Override public long segmentSize(long idx) {
return -1;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 5037ca3..18291c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -680,6 +680,101 @@ public class SegmentAwareTest {
}
/**
+ * Checking the correctness of WAL archive size.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWalArchiveSize() throws Exception {
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+
+ IgniteInternalFuture<?> fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
+
+ aware.addCurrentWalArchiveSize(4);
+ assertFutureIsNotFinish(fut);
+
+ aware.addReservedWalArchiveSize(4);
+ assertFutureIsNotFinish(fut);
+
+ aware.addCurrentWalArchiveSize(4);
+ fut.get(20);
+
+ aware.resetWalArchiveSizes();
+
+ fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
+
+ aware.addCurrentWalArchiveSize(4);
+ assertFutureIsNotFinish(fut);
+
+ aware.addReservedWalArchiveSize(4);
+ assertFutureIsNotFinish(fut);
+
+ aware.addReservedWalArchiveSize(4);
+ fut.get(20);
+
+ aware.resetWalArchiveSizes();
+
+ fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
+
+ aware.interrupt();
+ assertTrue(fut.get(20) instanceof IgniteInterruptedCheckedException);
+
+ aware.reset();
+
+ fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
+
+ aware.forceInterrupt();
+ assertTrue(fut.get(20) instanceof IgniteInterruptedCheckedException);
+ }
+
+ /**
+ * Checking the correctness of truncate logic.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTruncate() throws Exception {
+ SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+
+ IgniteInternalFuture<?> fut = awaitThread(aware::awaitAvailableTruncateArchive);
+
+ aware.lastCheckpointIdx(5);
+
+ fut.get(20);
+ assertEquals(5, aware.awaitAvailableTruncateArchive());
+
+ aware.reserve(4);
+ assertEquals(4, aware.awaitAvailableTruncateArchive());
+
+ aware.setLastArchivedAbsoluteIndex(3);
+ assertEquals(3, aware.awaitAvailableTruncateArchive());
+
+ aware.lastTruncatedArchiveIdx(0);
+ assertEquals(2, aware.awaitAvailableTruncateArchive());
+ assertEquals(0, aware.lastTruncatedArchiveIdx());
+
+ aware.reserve(0);
+ fut = awaitThread(aware::awaitAvailableTruncateArchive);
+
+ aware.release(0);
+
+ fut.get(20);
+ assertEquals(2, aware.awaitAvailableTruncateArchive());
+
+ aware.setLastArchivedAbsoluteIndex(4);
+ assertEquals(3, aware.awaitAvailableTruncateArchive());
+
+ aware.release(4);
+ assertEquals(3, aware.awaitAvailableTruncateArchive());
+
+ aware.lastCheckpointIdx(6);
+ assertEquals(3, aware.awaitAvailableTruncateArchive());
+
+ aware.setLastArchivedAbsoluteIndex(6);
+ assertEquals(5, aware.awaitAvailableTruncateArchive());
+ }
+
+ /**
* Assert that future is still not finished.
*
* @param future Future to check.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 2d38439..3f1cf07 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -25,6 +25,7 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
@@ -39,6 +40,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -90,6 +93,7 @@ import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.readResource;
import static org.junit.Assert.assertArrayEquals;
@@ -1424,6 +1428,34 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ * Test to verify the {@link U#uncompressedSize}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUncompressedSize() throws Exception {
+ File zipFile = new File(System.getProperty("java.io.tmpdir"), "test.zip");
+
+ try {
+ assertThrows(log, () -> U.uncompressedSize(zipFile), IOException.class, null);
+
+ byte[] raw = IntStream.range(0, 10).mapToObj(i -> zipFile.getAbsolutePath() + i)
+ .collect(joining()).getBytes(StandardCharsets.UTF_8);
+
+ try (FileOutputStream fos = new FileOutputStream(zipFile)) {
+ fos.write(U.zip(raw));
+
+ fos.flush();
+ }
+
+ assertEquals(raw.length, U.uncompressedSize(zipFile));
+ }
+ finally {
+ assertTrue(U.delete(zipFile));
+ }
+ }
+
+ /**
* Reading lines from a resource file and passing them to consumer.
* If read string is {@code "null"}, it is converted to {@code null}.
*