You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2020/12/11 12:29:53 UTC
[ignite] branch master updated: IGNITE-13815 Remove ability to
delete segments from the middle of WAL archive - Fixes #8545.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 28b15f1 IGNITE-13815 Remove ability to delete segments from the middle of WAL archive - Fixes #8545.
28b15f1 is described below
commit 28b15f134c260436bd705acd646e611000dae180
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Dec 11 15:28:47 2020 +0300
IGNITE-13815 Remove ability to delete segments from the middle of WAL archive - Fixes #8545.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 11 +-
.../persistence/checkpoint/CheckpointHistory.java | 2 +-
.../persistence/wal/FileWriteAheadLogManager.java | 345 ++++++++++-----------
.../wal/aware/SegmentArchivedStorage.java | 17 +-
.../cache/persistence/wal/aware/SegmentAware.java | 92 +++---
.../wal/aware/SegmentCompressStorage.java | 42 +--
.../wal/aware/SegmentCurrentStateStorage.java | 84 +++--
.../persistence/wal/aware/SegmentLockStorage.java | 39 ++-
.../wal/aware/SegmentReservationStorage.java | 35 ++-
.../persistence/wal/io/LockedReadFileInput.java | 27 +-
.../ignite/internal/visor/misc/VisorWalTask.java | 2 +-
.../db/IgnitePdsReserveWalSegmentsTest.java | 148 ++++++---
.../db/wal/IgniteWalIteratorSwitchSegmentTest.java | 171 +++++-----
.../cache/persistence/pagemem/NoOpWALManager.java | 2 +-
.../persistence/wal/aware/SegmentAwareTest.java | 73 ++++-
15 files changed, 602 insertions(+), 488 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index cb4fc30..f3d85c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -146,15 +146,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
public void release(WALPointer start) throws IgniteCheckedException;
/**
- * Gives a hint to WAL manager to clear entries logged before the given pointer. Some entries before the
- * the given pointer will be kept because there is a configurable WAL history size. Those entries may be used
- * for partial partition rebalancing.
+ * Gives a hint to WAL manager to clear entries logged before the given pointer.
+ * If entries are needed for binary recovery, they will not be affected.
+ * Some entries may be reserved eg for historical rebalance and they also will not be affected.
*
- * @param low Pointer since which WAL will be truncated. If null, WAL will be truncated from the oldest segment.
- * @param high Pointer for which it is safe to clear the log.
+ * @param high Upper border to which WAL segments will be deleted.
* @return Number of deleted WAL segments.
*/
- public int truncate(WALPointer low, WALPointer high);
+ public int truncate(@Nullable WALPointer high);
/**
* Notifies {@code this} about latest checkpoint pointer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 1112579..ed28d95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -378,7 +378,7 @@ public class CheckpointHistory {
int deleted = 0;
if (truncateWalOnCpFinish)
- deleted += wal.truncate(null, firstCheckpointPointer());
+ deleted += wal.truncate(firstCheckpointPointer());
chp.walFilesDeleted(deleted);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 34db1dee..7a24fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -290,8 +290,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Failure processor */
private final FailureProcessor failureProcessor;
- /** */
- private IgniteConfiguration igCfg;
+ /** Ignite configuration. */
+ private final IgniteConfiguration igCfg;
/** Persistence metrics tracker. */
private DataStorageMetricsImpl metrics;
@@ -400,7 +400,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private final Map<Long, Long> segmentSize = new ConcurrentHashMap<>();
+ /** Pointer to the last successful checkpoint until which WAL segments can be safely deleted. */
+ private volatile WALPointer lastCheckpointPtr = new WALPointer(0, 0, 0);
+
/**
+ * Constructor.
+ *
* @param ctx Kernal context.
*/
public FileWriteAheadLogManager(final GridKernalContext ctx) {
@@ -428,8 +433,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
fileHandleManagerFactory = new FileHandleManagerFactory(dsCfg);
maxSegCountWithoutCheckpoint =
- (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
- / dsCfg.getWalSegmentSize());
+ (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
+ / dsCfg.getWalSegmentSize());
switchSegmentRecordOffset = isArchiverEnabled() ? new AtomicLongArray(dsCfg.getWalSegments()) : null;
}
@@ -973,7 +978,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
log,
segmentAware,
segmentRouter,
- lockedSegmentFileInputFactory);
+ lockedSegmentFileInputFactory
+ );
try {
iter.init(); // Make sure iterator is closed on any error.
@@ -989,25 +995,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override public boolean reserve(WALPointer start) {
- assert start != null : "Invalid start pointer: " + start;
+ assert start != null;
if (mode == WALMode.NONE)
return false;
- segmentAware.reserve(start.index());
+ // Protection from deletion.
+ boolean reserved = segmentAware.reserve(start.index());
- if (!hasIndex(start.index())) {
- segmentAware.release(start.index());
+ // Segment presence check.
+ if (reserved && !hasIndex(start.index())) {
+ segmentAware.reserve(start.index());
- return false;
+ reserved = false;
}
- return true;
+ return reserved;
}
/** {@inheritDoc} */
@Override public void release(WALPointer start) {
- assert start != null : "Invalid start pointer: " + start;
+ assert start != null;
if (mode == WALMode.NONE)
return;
@@ -1016,16 +1024,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param absIdx Absolulte index to check.
- * @return {@code true} if has this index.
+ * Checking for the existence of an index.
+ *
+ * @param absIdx Segment index.
+ * @return {@code True} exists.
*/
private boolean hasIndex(long absIdx) {
String segmentName = fileName(absIdx);
- String zipSegmentName = segmentName + ZIP_SUFFIX;
-
boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
- new File(walArchiveDir, zipSegmentName).exists();
+ new File(walArchiveDir, segmentName + ZIP_SUFFIX).exists();
if (inArchive)
return true;
@@ -1039,30 +1047,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
- @Override public int truncate(WALPointer low, WALPointer high) {
+ @Override public int truncate(@Nullable WALPointer high) {
if (high == null)
return 0;
- // File pointer bound: older entries will be deleted from archive
-
- FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+ FileDescriptor[] descs = walArchiveFiles();
int deleted = 0;
for (FileDescriptor desc : descs) {
- if (low != null && desc.idx < low.index())
- continue;
-
- // Do not delete reserved or locked segment and any segment after it.
- if (segmentReservedOrLocked(desc.idx))
- return deleted;
-
long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex();
long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex();
- // We need to leave at least one archived segment to correctly determine the archive index.
- if (desc.idx < high.index() && desc.idx < lastArchived) {
+ if (desc.idx >= lastCheckpointPtr.index() // We cannot delete segments needed for binary recovery.
+ || desc.idx >= lastArchived // We cannot delete last segment, it is needed at start of node and avoid gaps.
+ || !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment.
+ return deleted;
+
+ if (desc.idx < high.index()) {
if (!desc.file.delete()) {
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath());
@@ -1099,8 +1102,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override public void notchLastCheckpointPtr(WALPointer ptr) {
- if (compressor != null)
- segmentAware.keepUncompressedIdxFrom(ptr.index());
+ lastCheckpointPtr = ptr;
}
/** {@inheritDoc} */
@@ -1117,9 +1119,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (lastArchived == -1)
return 0;
- int res = (int)(lastArchived - lastTruncated);
-
- return res >= 0 ? res : 0;
+ return Math.max((int)(lastArchived - lastTruncated), 0);
}
/** {@inheritDoc} */
@@ -1369,9 +1369,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser);
- if (archiver0 != null)
- segmentAware.curAbsWalIdx(absIdx);
- else
+ segmentAware.curAbsWalIdx(absIdx);
+
+ FileDescriptor[] walArchiveFiles = walArchiveFiles();
+
+ segmentAware.minReserveIndex(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1);
+
+ if (archiver0 == null)
segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1);
// Getting segment sizes.
@@ -1494,17 +1498,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @throws StorageException If failed.
*/
private void checkOrPrepareFiles() throws StorageException {
- // Clean temp files.
- {
- File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
-
- if (!F.isEmpty(tmpFiles)) {
- for (File tmp : tmpFiles) {
- if (!tmp.delete()) {
- throw new StorageException("Failed to delete previously created temp file " +
- "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
- }
- }
+ Collection<File> tmpFiles = new HashSet<>();
+
+ for (File walDir : F.asList(walWorkDir, walArchiveDir)) {
+ tmpFiles.addAll(F.asList(walDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER)));
+ tmpFiles.addAll(F.asList(walDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER)));
+ }
+
+ for (File tmpFile : tmpFiles) {
+ if (tmpFile.exists() && !tmpFile.delete()) {
+ throw new StorageException("Failed to delete previously created temp file " +
+ "(make sure Ignite process has enough rights): " + tmpFile.getAbsolutePath());
}
}
@@ -1605,6 +1609,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
FileArchiver archiver0 = archiver;
if (archiver0 == null) {
+ segmentAware.curAbsWalIdx(curIdx + 1);
segmentAware.setLastArchivedAbsoluteIndex(curIdx);
return new File(walWorkDir, fileName(curIdx + 1));
@@ -1634,7 +1639,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * Files from archive WAL directory.
+ * Files from {@link #walArchiveDir}.
+ *
+ * @return Raw or compressed WAL segments from archive.
*/
private FileDescriptor[] walArchiveFiles() {
return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
@@ -1985,22 +1992,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/**
- * @param absIdx Segment absolute index.
- * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need
- * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
- */
- public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
- return segmentAware.checkCanReadArchiveOrReserveWorkSegment(absIdx);
- }
-
- /**
- * @param absIdx Segment absolute index.
- */
- public void releaseWorkSegment(long absIdx) {
- segmentAware.releaseWorkSegment(absIdx);
- }
-
- /**
* Moves WAL segment from work folder to archive folder. Temp file is used to do movement.
*
* @param absIdx Absolute index to archive.
@@ -2081,18 +2072,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
checkFiles(
1,
true,
- new IgnitePredicate<Integer>() {
- @Override public boolean apply(Integer integer) {
- return !checkStop();
- }
- },
- new CI1<Integer>() {
- @Override public void apply(Integer idx) {
- synchronized (FileArchiver.this) {
- formatted = idx;
+ (IgnitePredicate<Integer>)integer -> !checkStop(),
+ (CI1<Integer>)idx -> {
+ synchronized (FileArchiver.this) {
+ formatted = idx;
- FileArchiver.this.notifyAll();
- }
+ FileArchiver.this.notifyAll();
}
}
);
@@ -2131,15 +2116,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** */
private void init() {
- File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
-
- for (File f : toDel) {
- if (isCancelled())
- return;
-
- f.delete();
- }
-
for (int i = 1; i < calculateThreadCount(); i++) {
FileCompressorWorker worker = new FileCompressorWorker(i, log);
@@ -2400,7 +2376,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (segmentReservedOrLocked(desc.idx))
return;
- if (desc.idx < segmentAware.keepUncompressedIdxFrom() && duplicateIndices.contains(desc.idx)) {
+ if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) {
if (desc.file.exists() && !desc.file.delete()) {
U.warn(log, "Failed to remove obsolete WAL segment " +
"(make sure the process has enough rights): " + desc.file.getAbsolutePath() +
@@ -2416,13 +2392,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
private class FileDecompressor extends GridWorker {
/** Decompression futures. */
- private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
+ private final Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
/** Segments queue. */
private final PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
/** Byte array for draining data. */
- private byte[] arr = new byte[BUF_SIZE];
+ private final byte[] arr = new byte[BUF_SIZE];
/**
* @param log Logger.
@@ -2730,18 +2706,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private final DataStorageConfiguration dsCfg;
/** Optional start pointer. */
- @Nullable
- private WALPointer start;
+ @Nullable private final WALPointer start;
/** Optional end pointer. */
- @Nullable
- private WALPointer end;
+ @Nullable private final WALPointer end;
/** Manager of segment location. */
- private SegmentRouter segmentRouter;
+ private final SegmentRouter segmentRouter;
/** Holder of actual information of latest manipulation on WAL segments. */
- private SegmentAware segmentAware;
+ private final SegmentAware segmentAware;
/**
* @param cctx Shared context.
@@ -2756,10 +2730,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment.
* @param segmentAware Segment aware.
* @param segmentRouter Segment router.
- * @param segmentFileInputFactory
+ * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files.
*/
private RecordsIterator(
- GridCacheSharedContext cctx,
+ GridCacheSharedContext<?, ?> cctx,
File walArchiveDir,
File walWorkDir,
@Nullable WALPointer start,
@@ -2774,13 +2748,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
SegmentRouter segmentRouter,
SegmentFileInputFactory segmentFileInputFactory
) throws IgniteCheckedException {
- super(log,
+ super(
+ log,
cctx,
serializerFactory,
ioFactory,
dsCfg.getWalRecordIteratorBufferSize(),
segmentFileInputFactory
);
+
this.walArchiveDir = walArchiveDir;
this.walWorkDir = walWorkDir;
this.archiver = archiver;
@@ -2890,57 +2866,70 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
curWalSegmIdx++;
- boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); //lock during creation handle.
+ // Segment deletion protection.
+ if (!segmentAware.reserve(curWalSegmIdx))
+ throw new IgniteCheckedException("Segment does not exist: " + curWalSegmIdx);
- FileDescriptor fd = null;
- ReadFileHandle nextHandle;
try {
- fd = segmentRouter.findSegment(curWalSegmIdx);
+ // Protection against transferring a segment to the archive by #archiver.
+ boolean readArchive = archiver != null && !segmentAware.lock(curWalSegmIdx);
- if (log.isDebugEnabled())
- log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']');
+ FileDescriptor fd = null;
+ ReadFileHandle nextHandle;
+ try {
+ fd = segmentRouter.findSegment(curWalSegmIdx);
- nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null);
- }
- catch (FileNotFoundException e) {
- if (readArchive)
- throw new IgniteCheckedException("Missing WAL segment in the archive", e);
- else {
- // Log only when no segments were read. This will help us avoiding logging on the end of the WAL.
- if (curRec == null && curWalSegment == null) {
- File workDirFile = new File(walWorkDir, fileName(curWalSegmIdx % dsCfg.getWalSegments()));
- File archiveDirFile = new File(walArchiveDir, fileName(curWalSegmIdx));
-
- U.warn(
- log,
- "Next segment file is not found [" +
- "curWalSegmIdx=" + curWalSegmIdx
- + ", start=" + start
- + ", end=" + end
- + ", filePath=" + (fd == null ? "<empty>" : fd.file.getAbsolutePath())
- + ", walWorkDir=" + walWorkDir
- + ", walWorkDirContent=" + listFileNames(walWorkDir)
- + ", walArchiveDir=" + walArchiveDir
- + ", walArchiveDirContent=" + listFileNames(walArchiveDir)
- + ", workDirFile=" + workDirFile.getName()
- + ", exists=" + workDirFile.exists()
- + ", archiveDirFile=" + archiveDirFile.getName()
- + ", exists=" + archiveDirFile.exists()
- + "]",
- e
- );
+ if (log.isDebugEnabled()) {
+ log.debug("Reading next file [absIdx=" + curWalSegmIdx +
+ ", file=" + fd.file.getAbsolutePath() + ']');
}
- nextHandle = null;
+ nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null);
}
- }
+ catch (FileNotFoundException e) {
+ if (readArchive)
+ throw new IgniteCheckedException("Missing WAL segment in the archive: " + curWalSegment, e);
+ else {
+ // Log only when no segments were read. This will help us avoiding logging on the end of the WAL.
+ if (curRec == null && curWalSegment == null) {
+ File workDirFile = new File(walWorkDir, fileName(curWalSegmIdx % dsCfg.getWalSegments()));
+ File archiveDirFile = new File(walArchiveDir, fileName(curWalSegmIdx));
+
+ U.warn(
+ log,
+ "Next segment file is not found [" +
+ "curWalSegmIdx=" + curWalSegmIdx
+ + ", start=" + start
+ + ", end=" + end
+ + ", filePath=" + (fd == null ? "<empty>" : fd.file.getAbsolutePath())
+ + ", walWorkDir=" + walWorkDir
+ + ", walWorkDirContent=" + listFileNames(walWorkDir)
+ + ", walArchiveDir=" + walArchiveDir
+ + ", walArchiveDirContent=" + listFileNames(walArchiveDir)
+ + ", workDirFile=" + workDirFile.getName()
+ + ", exists=" + workDirFile.exists()
+ + ", archiveDirFile=" + archiveDirFile.getName()
+ + ", exists=" + archiveDirFile.exists()
+ + "]",
+ e
+ );
+ }
- if (!readArchive)
- releaseWorkSegment(curWalSegmIdx);
+ nextHandle = null;
+ }
+ }
+ finally {
+ if (archiver != null && !readArchive)
+ segmentAware.unlock(curWalSegmIdx);
+ }
- curRec = null;
+ curRec = null;
- return nextHandle;
+ return nextHandle;
+ }
+ finally {
+ segmentAware.release(curWalSegmIdx);
+ }
}
/** */
@@ -2955,63 +2944,47 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override protected IgniteCheckedException handleRecordException(Exception e, @Nullable WALPointer ptr) {
- if (e instanceof IgniteCheckedException)
- if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
- // This means that there is no explicit last sengment, so we iterate unil the very end.
- if (end == null) {
- long nextWalSegmentIdx = curWalSegmIdx + 1;
-
- if (!isArchiverEnabled())
- if (canIgnoreCrcError(nextWalSegmentIdx, nextWalSegmentIdx, e, ptr))
- return null;
-
+ if (e instanceof IgniteCheckedException && X.hasCause(e, IgniteDataIntegrityViolationException.class)) {
+ // This means that there is no explicit last segment, so we iterate until the very end.
+ if (end == null) {
+ long nextWalSegmentIdx = curWalSegmIdx + 1;
+
+ if (archiver == null) {
+ if (canIgnoreCrcError(nextWalSegmentIdx, nextWalSegmentIdx, e, ptr))
+ return null;
+ }
+ else {
// Check that we should not look this segment up in archive directory.
// Basically the same check as in "advanceSegment" method.
- if (isArchiverEnabled() && archiver != null)
- if (!canReadArchiveOrReserveWork(nextWalSegmentIdx))
- try {
- long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments();
- if (canIgnoreCrcError(workIdx, nextWalSegmentIdx, e, ptr))
- return null;
- }
- finally {
- releaseWorkSegment(nextWalSegmentIdx);
+ // Segment deletion protection.
+ if (segmentAware.reserve(nextWalSegmentIdx)) {
+ try {
+ // Protection against transferring a segment to the archive by #archiver.
+ if (segmentAware.lock(nextWalSegmentIdx)) {
+ try {
+ long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments();
+
+ if (canIgnoreCrcError(workIdx, nextWalSegmentIdx, e, ptr))
+ return null;
+ }
+ finally {
+ segmentAware.unlock(nextWalSegmentIdx);
+ }
}
+ }
+ finally {
+ segmentAware.release(nextWalSegmentIdx);
+ }
+ }
}
+ }
+ }
return super.handleRecordException(e, ptr);
}
/**
- * @param absIdx Absolute index to check.
- * @return <ul><li> {@code True} if we can safely read the archive, </li> <li>{@code false} if the segment has
- * not been archived yet. In this case the corresponding work segment is reserved (will not be deleted until
- * release). Use {@link #releaseWorkSegment} for unlock </li></ul>
- */
- private boolean canReadArchiveOrReserveWork(long absIdx) {
- return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
- }
-
- /**
- * @param absIdx Absolute index to release.
- */
- private void releaseWorkSegment(long absIdx) {
- if (archiver != null)
- archiver.releaseWorkSegment(absIdx);
- }
-
- /**
- * Check that archiver is enabled
- */
- private boolean isArchiverEnabled() {
- if (walArchiveDir != null && walWorkDir != null)
- return !walArchiveDir.equals(walWorkDir);
-
- return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
- }
-
- /**
* @param workIdx Work index.
* @param walSegmentIdx Wal segment index.
* @param e Exception.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 438b922..53b3b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -43,22 +43,11 @@ class SegmentArchivedStorage extends SegmentObservable {
/**
* @param segmentLockStorage Protects WAL work segments from moving.
*/
- private SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) {
+ SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) {
this.segmentLockStorage = segmentLockStorage;
}
/**
- * @param segmentLockStorage Protects WAL work segments from moving.
- */
- static SegmentArchivedStorage buildArchivedStorage(SegmentLockStorage segmentLockStorage) {
- SegmentArchivedStorage archivedStorage = new SegmentArchivedStorage(segmentLockStorage);
-
- segmentLockStorage.addObserver(archivedStorage::onSegmentUnlocked);
-
- return archivedStorage;
- }
-
- /**
* @return Last archived segment absolute index.
*/
long lastArchivedAbsoluteIndex() {
@@ -105,7 +94,7 @@ class SegmentArchivedStorage extends SegmentObservable {
*/
synchronized void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException {
try {
- while (segmentLockStorage.locked(toArchive) && !interrupted)
+ while (!segmentLockStorage.minLockIndex(toArchive) && !interrupted)
wait();
}
catch (InterruptedException e) {
@@ -145,7 +134,7 @@ class SegmentArchivedStorage extends SegmentObservable {
/**
* Callback for waking up waiters of this object when unlocked happened.
*/
- private synchronized void onSegmentUnlocked(long segmentId) {
+ synchronized void onSegmentUnlocked(long segmentId) {
notifyAll();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index be60895..89523db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -19,10 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCompressStorage.buildCompressStorage;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCurrentStateStorage.buildCurrentStateStorage;
-
/**
* Holder of actual information of latest manipulation on WAL segments.
*/
@@ -34,7 +30,7 @@ public class SegmentAware {
private final SegmentLockStorage segmentLockStorage = new SegmentLockStorage();
/** Manages last archived index, emulates archivation in no-archiver mode. */
- private final SegmentArchivedStorage segmentArchivedStorage = buildArchivedStorage(segmentLockStorage);
+ private final SegmentArchivedStorage segmentArchivedStorage;
/** Storage of actual information about current index of compressed segments. */
private final SegmentCompressStorage segmentCompressStorage;
@@ -43,12 +39,21 @@ public class SegmentAware {
private final SegmentCurrentStateStorage segmentCurrStateStorage;
/**
+ * Constructor.
+ *
* @param walSegmentsCnt Total WAL segments count.
* @param compactionEnabled Is wal compaction enabled.
*/
public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) {
- segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
- segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled);
+ segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage);
+
+ segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt);
+ segmentCompressStorage = new SegmentCompressStorage(compactionEnabled);
+
+ segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived);
+ segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived);
+
+ segmentLockStorage.addObserver(segmentArchivedStorage::onSegmentUnlocked);
}
/**
@@ -133,20 +138,6 @@ public class SegmentAware {
}
/**
- * @param idx Minimum raw segment index that should be preserved from deletion.
- */
- public void keepUncompressedIdxFrom(long idx) {
- segmentCompressStorage.keepUncompressedIdxFrom(idx);
- }
-
- /**
- * @return Minimum raw segment index that should be preserved from deletion.
- */
- public long keepUncompressedIdxFrom() {
- return segmentCompressStorage.keepUncompressedIdxFrom();
- }
-
- /**
* Update current WAL index.
*
* @param curAbsWalIdx New current WAL index.
@@ -184,10 +175,14 @@ public class SegmentAware {
}
/**
+ * Segment reservation. It will be successful if segment is {@code >} than
+ * the {@link #minReserveIndex minimum}.
+ *
* @param absIdx Index for reservation.
+ * @return {@code True} if the reservation was successful.
*/
- public void reserve(long absIdx) {
- reservationStorage.reserve(absIdx);
+ public boolean reserve(long absIdx) {
+ return reservationStorage.reserve(absIdx);
}
/**
@@ -208,9 +203,9 @@ public class SegmentAware {
}
/**
- * Check if WAL segment locked (protected from move to archive)
+ * Check if WAL segment locked (protected from move to archive).
*
- * @param absIdx Index for check reservation.
+ * @param absIdx Index for check locking.
* @return {@code True} if index is locked.
*/
public boolean locked(long absIdx) {
@@ -218,27 +213,20 @@ public class SegmentAware {
}
/**
- * @param absIdx Segment absolute index.
- * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release
- * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
- */
- public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
- return lastArchivedAbsoluteIndex() >= absIdx || segmentLockStorage.lockWorkSegment(absIdx);
- }
-
- /**
- * Visible for test.
+ * Segment lock. It will be successful if segment is {@code >} than
+ * the {@link #lastArchivedAbsoluteIndex last archived}.
*
- * @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+ * @param absIdx Index to lock.
+ * @return {@code True} if the lock was successful.
*/
- void lockWorkSegment(long absIdx) {
- segmentLockStorage.lockWorkSegment(absIdx);
+ public boolean lock(long absIdx) {
+ return segmentLockStorage.lockWorkSegment(absIdx);
}
/**
- * @param absIdx Segment absolute index.
+ * @param absIdx Index to unlock.
*/
- public void releaseWorkSegment(long absIdx) {
+ public void unlock(long absIdx) {
segmentLockStorage.releaseWorkSegment(absIdx);
}
@@ -274,4 +262,28 @@ public class SegmentAware {
segmentCurrStateStorage.forceInterrupt();
}
+
+ /**
+ * Increasing minimum segment index after that can be reserved.
+ * Value will be updated if it is greater than the current one.
+ * If segment is already reserved, the update will fail.
+ *
+ * @param absIdx Absolut segment index.
+ * @return {@code True} if update is successful.
+ */
+ public boolean minReserveIndex(long absIdx) {
+ return reservationStorage.minReserveIndex(absIdx);
+ }
+
+ /**
+ * Increasing minimum segment index after that can be locked.
+ * Value will be updated if it is greater than the current one.
+ * If segment is already reserved, the update will fail.
+ *
+ * @param absIdx Absolut segment index.
+ * @return {@code True} if update is successful.
+ */
+ public boolean minLockIndex(long absIdx) {
+ return segmentLockStorage.minLockIndex(absIdx);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 5d88e52..62fe69d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -30,9 +30,6 @@ public class SegmentCompressStorage {
/** Flag of interrupt waiting on this object. */
private volatile boolean interrupted;
- /** Manages last archived index, emulates archivation in no-archiver mode. */
- private final SegmentArchivedStorage segmentArchivedStorage;
-
/** If WAL compaction enabled. */
private final boolean compactionEnabled;
@@ -51,33 +48,16 @@ public class SegmentCompressStorage {
/** Compressed segment with maximal index. */
private long lastMaxCompressedIdx = -1L;
- /** Min uncompressed index to keep. */
- private volatile long minUncompressedIdxToKeep = -1L;
-
/**
- * @param segmentArchivedStorage Storage of last archived segment.
+ * Constructor.
+ *
* @param compactionEnabled If WAL compaction enabled.
*/
- private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage, boolean compactionEnabled) {
- this.segmentArchivedStorage = segmentArchivedStorage;
-
+ SegmentCompressStorage(boolean compactionEnabled) {
this.compactionEnabled = compactionEnabled;
}
/**
- * @param segmentArchivedStorage Storage of last archived segment.
- * @param compactionEnabled If WAL compaction enabled.
- */
- static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage,
- boolean compactionEnabled) {
- SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled);
-
- segmentArchivedStorage.addObserver(storage::onSegmentArchived);
-
- return storage;
- }
-
- /**
* Callback after segment compression finish.
*
* @param compressedIdx Index of compressed segment.
@@ -148,7 +128,7 @@ public class SegmentCompressStorage {
/**
* Callback for waking up compressor when new segment is archived.
*/
- private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+ synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled)
segmentsToCompress.add(++lastEnqueuedToCompressIdx);
@@ -156,20 +136,6 @@ public class SegmentCompressStorage {
}
/**
- * @param idx Minimum raw segment index that should be preserved from deletion.
- */
- void keepUncompressedIdxFrom(long idx) {
- minUncompressedIdxToKeep = idx;
- }
-
- /**
- * @return Minimum raw segment index that should be preserved from deletion.
- */
- long keepUncompressedIdxFrom() {
- return minUncompressedIdxToKeep;
- }
-
- /**
* Reset interrupted flag.
*/
public void reset() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
index 7339497..6672879 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
/**
* Storage of absolute current segment index.
*/
-class SegmentCurrentStateStorage {
+class SegmentCurrentStateStorage extends SegmentObservable {
/** Flag of interrupt of waiting on this object. */
private volatile boolean interrupted;
@@ -32,38 +32,22 @@ class SegmentCurrentStateStorage {
/** Total WAL segments count. */
private final int walSegmentsCnt;
- /** Manages last archived index, emulates archivation in no-archiver mode. */
- private final SegmentArchivedStorage segmentArchivedStorage;
-
/**
* Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during rollover.
* Also may be directly set if WAL is resuming logging after start.
*/
private volatile long curAbsWalIdx = -1;
- /**
- * @param walSegmentsCnt Total WAL segments count.
- * @param segmentArchivedStorage Last archived segment storage.
- */
- private SegmentCurrentStateStorage(int walSegmentsCnt, SegmentArchivedStorage segmentArchivedStorage) {
- this.walSegmentsCnt = walSegmentsCnt;
- this.segmentArchivedStorage = segmentArchivedStorage;
- }
+ /** Last archived file absolute index. */
+ private volatile long lastAbsArchivedIdx = -1;
/**
+ * Constructor.
+ *
* @param walSegmentsCnt Total WAL segments count.
- * @param segmentArchivedStorage Last archived segment storage.
*/
- static SegmentCurrentStateStorage buildCurrentStateStorage(
- int walSegmentsCnt,
- SegmentArchivedStorage segmentArchivedStorage
- ) {
-
- SegmentCurrentStateStorage currStorage = new SegmentCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
-
- segmentArchivedStorage.addObserver(currStorage::onSegmentArchived);
-
- return currStorage;
+ SegmentCurrentStateStorage(int walSegmentsCnt) {
+ this.walSegmentsCnt = walSegmentsCnt;
}
/**
@@ -87,13 +71,11 @@ class SegmentCurrentStateStorage {
* Waiting until archivation of next segment will be allowed.
*/
synchronized long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException {
- long lastArchivedSegment = segmentArchivedStorage.lastArchivedAbsoluteIndex();
-
//We can archive segment if it less than current work segment so for archivate lastArchiveSegment + 1
// we should be ensure that currentWorkSegment = lastArchiveSegment + 2
- awaitSegment(lastArchivedSegment + 2);
+ awaitSegment(lastAbsArchivedIdx + 2);
- return lastArchivedSegment + 1;
+ return lastAbsArchivedIdx + 1;
}
/**
@@ -102,23 +84,31 @@ class SegmentCurrentStateStorage {
*
* @return Next absolute segment index.
*/
- synchronized long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException {
- curAbsWalIdx++;
+ long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException {
+ long nextAbsIdx;
- notifyAll();
+ synchronized (this) {
+ curAbsWalIdx++;
- try {
- while (curAbsWalIdx - segmentArchivedStorage.lastArchivedAbsoluteIndex() > walSegmentsCnt && !forceInterrupted)
- wait();
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(e);
+ notifyAll();
+
+ try {
+ while (curAbsWalIdx - lastAbsArchivedIdx > walSegmentsCnt && !forceInterrupted)
+ wait();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+
+ if (forceInterrupted)
+ throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx");
+
+ nextAbsIdx = curAbsWalIdx;
}
- if (forceInterrupted)
- throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx");
+ notifyObservers(nextAbsIdx);
- return curAbsWalIdx;
+ return nextAbsIdx;
}
/**
@@ -126,10 +116,14 @@ class SegmentCurrentStateStorage {
*
* @param curAbsWalIdx New current WAL index.
*/
- synchronized void curAbsWalIdx(long curAbsWalIdx) {
- this.curAbsWalIdx = curAbsWalIdx;
+ void curAbsWalIdx(long curAbsWalIdx) {
+ synchronized (this) {
+ this.curAbsWalIdx = curAbsWalIdx;
- notifyAll();
+ notifyAll();
+ }
+
+ notifyObservers(curAbsWalIdx);
}
/**
@@ -160,8 +154,12 @@ class SegmentCurrentStateStorage {
/**
* Callback for waking up awaiting when new segment is archived.
+ *
+ * @param lastAbsArchivedIdx Last archived file absolute index.
*/
- private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+ synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+ this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+
notifyAll();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
index 6588769..a5a7948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
@@ -29,7 +29,10 @@ public class SegmentLockStorage extends SegmentObservable {
* Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from
* {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
*/
- private Map<Long, Integer> locked = new ConcurrentHashMap<>();
+ private final Map<Long, Integer> locked = new ConcurrentHashMap<>();
+
+ /** Maximum segment index that can be locked. */
+ private volatile long minLockIdx = -1;
/**
* Check if WAL segment locked (protected from move to archive)
@@ -37,17 +40,22 @@ public class SegmentLockStorage extends SegmentObservable {
* @param absIdx Index for check reservation.
* @return {@code True} if index is locked.
*/
- public boolean locked(long absIdx) {
+ boolean locked(long absIdx) {
return locked.containsKey(absIdx);
}
/**
- * @param absIdx Segment absolute index.
- * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release
- * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+ * Segment lock. It will be successful if segment is {@code >} than the {@link #minLockIdx minimum}.
+ *
+ * @param absIdx Index to lock.
+ * @return {@code True} if the lock was successful.
*/
- boolean lockWorkSegment(long absIdx) {
- locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1);
+ synchronized boolean lockWorkSegment(long absIdx) {
+ if (absIdx > minLockIdx) {
+ locked.merge(absIdx, 1, Integer::sum);
+
+ return true;
+ }
return false;
}
@@ -64,4 +72,21 @@ public class SegmentLockStorage extends SegmentObservable {
notifyObservers(absIdx);
}
+
+ /**
+ * Increasing minimum segment index that can be locked.
+ * Value will be updated if it is greater than the current one.
+ * If segment is already locked, the update will fail.
+ *
+ * @param absIdx Absolut segment index.
+ * @return {@code True} if update is successful.
+ */
+ synchronized boolean minLockIndex(long absIdx) {
+ if (locked(absIdx))
+ return false;
+
+ minLockIdx = Math.max(minLockIdx, absIdx);
+
+ return true;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index 50c2bbf..42eece7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -27,13 +27,25 @@ class SegmentReservationStorage {
* Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has
* index >= reserved segment index. Guarded by {@code this}.
*/
- private NavigableMap<Long, Integer> reserved = new TreeMap<>();
+ private final NavigableMap<Long, Integer> reserved = new TreeMap<>();
+
+ /** Maximum segment index that can be reserved. */
+ private long minReserveIdx = -1;
/**
+ * Segment reservation. It will be successful if segment is {@code >} than the {@link #minReserveIdx minimum}.
+ *
* @param absIdx Index for reservation.
+ * @return {@code True} if the reservation was successful.
*/
- synchronized void reserve(long absIdx) {
- reserved.merge(absIdx, 1, (a, b) -> a + b);
+ synchronized boolean reserve(long absIdx) {
+ if (absIdx > minReserveIdx) {
+ reserved.merge(absIdx, 1, Integer::sum);
+
+ return true;
+ }
+
+ return false;
}
/**
@@ -59,4 +71,21 @@ class SegmentReservationStorage {
else
reserved.put(absIdx, cur - 1);
}
+
+ /**
+ * Increasing minimum segment index that can be reserved.
+ * Value will be updated if it is greater than the current one.
+ * If segment is already reserved, the update will fail.
+ *
+ * @param absIdx Absolut segment index.
+ * @return {@code True} if update is successful.
+ */
+ synchronized boolean minReserveIndex(long absIdx) {
+ if (reserved(absIdx))
+ return false;
+
+ minReserveIdx = Math.max(minReserveIdx, absIdx);
+
+ return true;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
index 6bb4786..13a905f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
@@ -69,19 +70,29 @@ final class LockedReadFileInput extends SimpleFileInput {
if (available >= requested)
return;
- boolean readArchive = segmentAware.checkCanReadArchiveOrReserveWorkSegment(segmentId);
+ // Segment deletion protection.
+ if (!segmentAware.reserve(segmentId))
+ throw new FileNotFoundException("Segment does not exist: " + segmentId);
+
try {
- if (readArchive && !isLastReadFromArchive) {
- isLastReadFromArchive = true;
+ // Protection against transferring a segment to the archive by #archiver.
+ boolean readArchive = !segmentAware.lock(segmentId);
+ try {
+ if (readArchive && !isLastReadFromArchive) {
+ isLastReadFromArchive = true;
- refreshIO();
- }
+ refreshIO();
+ }
- super.ensure(requested);
+ super.ensure(requested);
+ }
+ finally {
+ if (!readArchive)
+ segmentAware.unlock(segmentId);
+ }
}
finally {
- if (!readArchive)
- segmentAware.releaseWorkSegment(segmentId);
+ segmentAware.release(segmentId);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
index 19b3e92..d535751 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
@@ -237,7 +237,7 @@ public class VisorWalTask extends VisorMultiNodeTask<VisorWalTaskArg, VisorWalTa
dbMgr.onWalTruncated(lowBoundForTruncate);
- int num = wal.truncate(null, lowBoundForTruncate);
+ int num = wal.truncate(lowBoundForTruncate);
if (walFiles != null) {
sortWalFiles(walFiles);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
index 65dd776..e7c212f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -28,11 +29,13 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Test correctness of truncating unused WAL segments.
@@ -40,33 +43,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT
@WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2")
public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setConsistentId(gridName);
-
- CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
-
- ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
-
- cfg.setCacheConfiguration(ccfg);
-
- DataStorageConfiguration dbCfg = new DataStorageConfiguration();
-
- cfg.setDataStorageConfiguration(dbCfg);
-
- dbCfg.setWalSegmentSize(1024 * 1024)
- .setMaxWalArchiveSize(Long.MAX_VALUE)
- .setWalSegments(10)
- .setWalMode(WALMode.LOG_ONLY)
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(100 * 1024 * 1024)
- .setPersistenceEnabled(true));
-
- return cfg;
- }
-
- /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
@@ -80,6 +56,28 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
cleanPersistenceDir();
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ ).setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setCheckpointFrequency(Long.MAX_VALUE)
+ .setWalMode(WALMode.LOG_ONLY)
+ .setMaxWalArchiveSize(Long.MAX_VALUE)
+ .setWalSegmentSize(1024 * 1024)
+ .setWalSegments(10)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(100 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ )
+ );
+ }
+
/**
* Tests that range reserved method return correct number of reserved WAL segments.
*
@@ -87,18 +85,17 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
*/
@Test
public void testWalManagerRangeReservation() throws Exception {
- IgniteEx ig0 = prepareGrid(4);
+ IgniteEx n = prepareGrid(2);
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig0.context().cache().context()
- .database();
+ IgniteWriteAheadLogManager wal = n.context().cache().context().wal();
- IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal();
+ assertNotNull(wal);
- long resIdx = getReservedWalSegmentIndex(dbMgr);
+ long resIdx = getReservedWalSegmentIndex(wal);
assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0);
- WALPointer lowPtr = dbMgr.checkpointHistory().firstCheckpointPointer();
+ WALPointer lowPtr = lastCheckpointPointer(n);
assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx);
@@ -117,25 +114,24 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
*/
@Test
public void testWalDoesNotTruncatedWhenSegmentReserved() throws Exception {
- IgniteEx ig0 = prepareGrid(4);
+ IgniteEx n = prepareGrid(2);
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig0.context().cache().context()
- .database();
+ IgniteWriteAheadLogManager wal = n.context().cache().context().wal();
- IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal();
+ assertNotNull(wal);
- long resIdx = getReservedWalSegmentIndex(dbMgr);
+ long resIdx = getReservedWalSegmentIndex(wal);
assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0);
- WALPointer lowPtr = dbMgr.checkpointHistory().firstCheckpointPointer();
+ WALPointer lowPtr = lastCheckpointPointer(n);
assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx);
// Reserve previous WAL segment.
wal.reserve(new WALPointer(resIdx - 1, 0, 0));
- int numDel = wal.truncate(null, lowPtr);
+ int numDel = wal.truncate(lowPtr);
int expNumDel = (int)resIdx - 1;
@@ -143,6 +139,48 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
}
/**
+ * Checking that there will be no truncation of segments required for binary recovery.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNotTruncateSegmentsForBinaryRecovery() throws Exception {
+ IgniteEx n = prepareGrid(1);
+
+ IgniteWriteAheadLogManager wal = n.context().cache().context().wal();
+
+ assertNotNull(wal);
+
+ long resIdx = getReservedWalSegmentIndex(wal);
+ assertTrue(resIdx > 3);
+
+ WALPointer lastCheckpointPtr = lastCheckpointPointer(n);
+ assertEquals(lastCheckpointPtr.index(), resIdx);
+
+ wal.notchLastCheckpointPtr(new WALPointer(1, 0, 0));
+
+ if (compactionEnabled(n))
+ assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 1, 10_000));
+
+ int truncated = wal.truncate(lastCheckpointPtr);
+ assertTrue("truncated: " + truncated, truncated >= 1);
+
+ truncated = wal.truncate(lastCheckpointPtr);
+ assertEquals(0, truncated);
+
+ wal.notchLastCheckpointPtr(new WALPointer(2, 0, 0));
+
+ if (compactionEnabled(n))
+ assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 2, 10_000));
+
+ truncated = wal.truncate(lastCheckpointPtr);
+ assertTrue("truncated: " + truncated, truncated >= 1);
+
+ truncated = wal.truncate(lastCheckpointPtr);
+ assertEquals(0, truncated);
+ }
+
+ /**
* Starts grid and populates test data.
*
* @param cnt Grid count.
@@ -152,7 +190,8 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
private IgniteEx prepareGrid(int cnt) throws Exception {
IgniteEx ig0 = startGrids(cnt);
- ig0.cluster().active(true);
+ ig0.cluster().state(ClusterState.ACTIVE);
+ awaitPartitionMapExchange();
IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
@@ -167,11 +206,32 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
}
/**
- * Get index of reserved WAL segment by checkpointer.
+ * Get index of reserved WAL segment by checkpoint.
*
* @param dbMgr Database shared manager.
*/
- private long getReservedWalSegmentIndex(GridCacheDatabaseSharedManager dbMgr) {
- return dbMgr.checkpointHistory().firstCheckpointPointer().index();
+ private long getReservedWalSegmentIndex(IgniteWriteAheadLogManager dbMgr) {
+ return ((WALPointer)GridTestUtils.getFieldValueHierarchy(dbMgr, "lastCheckpointPtr")).index();
+ }
+
+ /**
+ * Getting WAL pointer last checkpoint.
+ *
+ * @param n Node.
+ * @return WAL pointer last checkpoint.
+ */
+ private WALPointer lastCheckpointPointer(IgniteEx n) {
+ return ((GridCacheDatabaseSharedManager)n.context().cache().context().database())
+ .checkpointHistory().lastCheckpoint().checkpointMark();
+ }
+
+ /**
+ * Checking that wal compaction enabled.
+ *
+ * @param n Node.
+ * @return {@code True} if enabled.
+ */
+ private boolean compactionEnabled(IgniteEx n) {
+ return n.configuration().getDataStorageConfiguration().isWalCompactionEnabled();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index f9b029e..8d65954 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.nio.channels.Channel;
-import java.nio.file.Paths;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -75,30 +74,30 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
/** Segment file size. */
private static final int SEGMENT_SIZE = 1024 * 1024;
+ /** Node dir. */
+ private static final String NODE_DIR = "NODE";
+
/** WAL segment file sub directory. */
- private static final String WORK_SUB_DIR = "/NODE/wal";
+ private static final String WORK_SUB_DIR = String.join(File.separator, "", NODE_DIR, "wal");
/** WAL archive segment file sub directory. */
- private static final String ARCHIVE_SUB_DIR = "/NODE/walArchive";
+ private static final String ARCHIVE_SUB_DIR = String.join(File.separator, "", NODE_DIR, "walArchive");
/** Serializer versions for check. */
- private int[] checkSerializerVers = new int[] {
- 1,
- 2
- };
+ private final int[] checkSerializerVers = new int[] {1, 2};
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- U.delete(Paths.get(U.defaultWorkDirectory()));
+ deleteNodeDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
- U.delete(Paths.get(U.defaultWorkDirectory()));
+ deleteNodeDir();
}
/**
@@ -114,6 +113,40 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
}
/**
+ * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte.
+ *
+ * @throws Exception If some thing failed.
+ */
+ @Test
+ public void testInvariantSwitchSegment() throws Exception {
+ for (int serVer : checkSerializerVers) {
+ try {
+ checkInvariantSwitchSegment(serVer);
+ }
+ finally {
+ deleteNodeDir();
+ }
+ }
+ }
+
+ /**
+ * Test for check switch segment from work dir to archive dir during iteration.
+ *
+ * @throws Exception If some thing failed.
+ */
+ @Test
+ public void testSwitchReadingSegmentFromWorkToArchive() throws Exception {
+ for (int serVer : checkSerializerVers) {
+ try {
+ checkSwitchReadingSegmentDuringIteration(serVer);
+ }
+ finally {
+ deleteNodeDir();
+ }
+ }
+ }
+
+ /**
* @param serVer WAL serializer version.
* @throws Exception If some thing failed.
*/
@@ -162,40 +195,6 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
}
/**
- * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte.
- *
- * @throws Exception If some thing failed.
- */
- @Test
- public void testInvariantSwitchSegment() throws Exception {
- for (int serVer : checkSerializerVers) {
- try {
- checkInvariantSwitchSegment(serVer);
- }
- finally {
- U.delete(Paths.get(U.defaultWorkDirectory()));
- }
- }
- }
-
- /**
- * Test for check switch segment from work dir to archive dir during iteration.
- *
- * @throws Exception If some thing failed.
- */
- @Test
- public void testSwitchReadingSegmentFromWorkToArchive() throws Exception {
- for (int serVer : checkSerializerVers) {
- try {
- checkSwitchReadingSegmentDuringIteration(serVer);
- }
- finally {
- U.delete(Paths.get(U.defaultWorkDirectory()));
- }
- }
- }
-
- /**
* @param serVer WAL serializer version.
* @throws Exception If some thing failed.
*/
@@ -270,8 +269,10 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
walMgr.flush(null, true);
+ SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, "segmentAware");
+
// Await archiver move segment to WAL archive.
- Thread.sleep(5000);
+ waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 5_000);
// If switchSegmentRecordSize more that 1, it mean that invariant is broke.
// Filling tail some garbage. Simulate tail garbage on rotate segment in WAL work directory.
@@ -300,7 +301,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
seg0.close();
}
- int expectedRecords = recordsToWrite;
+ int expRecords = recordsToWrite;
int actualRecords = 0;
// Check that switch segment works as expected and all record is reachable.
@@ -315,7 +316,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
}
}
- Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords);
+ Assert.assertEquals("Not all records read during iteration.", expRecords, actualRecords);
}
/**
@@ -340,75 +341,72 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, "segmentAware");
- //guard from archivation before iterator would be created.
- segmentAware.checkCanReadArchiveOrReserveWorkSegment(0);
+ // Guard from archiving before iterator would be created.
+ assertTrue(segmentAware.lock(0));
for (int i = 0; i < recordsToWrite; i++)
walMgr.log(new MetastoreDataRecord(rec.key(), rec.value()));
walMgr.flush(null, true);
- int expectedRecords = recordsToWrite;
AtomicInteger actualRecords = new AtomicInteger(0);
AtomicReference<String> startedSegmentPath = new AtomicReference<>();
AtomicReference<String> finishedSegmentPath = new AtomicReference<>();
- CountDownLatch startedIteratorLatch = new CountDownLatch(1);
+ CountDownLatch startedIterLatch = new CountDownLatch(1);
CountDownLatch finishedArchivedLatch = new CountDownLatch(1);
- IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
- () -> {
- // Check that switch segment works as expected and all record is reachable.
- try (WALIterator it = walMgr.replay(null)) {
- Object handle = getFieldValueHierarchy(it, "currWalSegment");
- FileInput in = getFieldValueHierarchy(handle, "in");
- Object delegate = getFieldValueHierarchy(in.io(), "delegate");
- Channel ch = getFieldValueHierarchy(delegate, "ch");
- String path = getFieldValueHierarchy(ch, "path");
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ // Check that switch segment works as expected and all record is reachable.
+ try (WALIterator it = walMgr.replay(null)) {
+ Object handle = getFieldValueHierarchy(it, "currWalSegment");
+ FileInput in = getFieldValueHierarchy(handle, "in");
+ Object delegate = getFieldValueHierarchy(in.io(), "delegate");
+ Channel ch = getFieldValueHierarchy(delegate, "ch");
+ String path = getFieldValueHierarchy(ch, "path");
- startedSegmentPath.set(path);
+ startedSegmentPath.set(path);
- startedIteratorLatch.countDown();
+ startedIterLatch.countDown();
- while (it.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
- WALRecord rec0 = tup.get2();
+ WALRecord rec0 = tup.get2();
- if (rec0.type() == METASTORE_DATA_RECORD)
- actualRecords.incrementAndGet();
+ if (rec0.type() == METASTORE_DATA_RECORD)
+ actualRecords.incrementAndGet();
- finishedArchivedLatch.await();
- }
-
- in = getFieldValueHierarchy(handle, "in");
- delegate = getFieldValueHierarchy(in.io(), "delegate");
- ch = getFieldValueHierarchy(delegate, "ch");
- path = getFieldValueHierarchy(ch, "path");
-
- finishedSegmentPath.set(path);
+ finishedArchivedLatch.await();
}
- return null;
+ in = getFieldValueHierarchy(handle, "in");
+ delegate = getFieldValueHierarchy(in.io(), "delegate");
+ ch = getFieldValueHierarchy(delegate, "ch");
+ path = getFieldValueHierarchy(ch, "path");
+
+ finishedSegmentPath.set(path);
}
- );
- startedIteratorLatch.await();
+ return null;
+ });
+
+ startedIterLatch.await();
- segmentAware.releaseWorkSegment(0);
+ segmentAware.unlock(0);
waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 5000);
finishedArchivedLatch.countDown();
- future.get();
+ fut.get();
//should started iteration from work directory but finish from archive directory.
- assertEquals(workDir + WORK_SUB_DIR + "/0000000000000000.wal", startedSegmentPath.get());
- assertEquals(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal", finishedSegmentPath.get());
+ assertEquals(workDir + WORK_SUB_DIR + File.separator + "0000000000000000.wal", startedSegmentPath.get());
+ assertEquals(workDir + ARCHIVE_SUB_DIR + File.separator + "0000000000000000.wal", finishedSegmentPath.get());
- Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords.get());
+ Assert.assertEquals("Not all records read during iteration.", recordsToWrite, actualRecords.get());
}
/***
@@ -493,4 +491,11 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
return new T2<>(walMgr, recordSerializer);
}
+
+ /**
+ * Delete node dir.
+ */
+ private void deleteNodeDir() throws Exception {
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), NODE_DIR, false));
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 2e78ad0..10634cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -95,7 +95,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
- @Override public int truncate(WALPointer low, WALPointer high) {
+ @Override public int truncate(@Nullable WALPointer high) {
return 0;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 0bd9fcb..60663ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -65,9 +65,8 @@ public class SegmentAwareTest {
int i = iterationCnt;
while (i-- > 0) {
- aware.lockWorkSegment(segmentToHandle);
-
- aware.releaseWorkSegment(segmentToHandle);
+ if (aware.lock(segmentToHandle))
+ aware.unlock(segmentToHandle);
}
});
@@ -388,12 +387,12 @@ public class SegmentAwareTest {
//given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, false);
- aware.checkCanReadArchiveOrReserveWorkSegment(5);
+ assertTrue(aware.lock(5));
IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5));
//when: release exact expected work segment.
- aware.releaseWorkSegment(5);
+ aware.unlock(5);
//then: waiting should finish immediately.
future.get(20);
@@ -406,7 +405,8 @@ public class SegmentAwareTest {
public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, false);
- aware.checkCanReadArchiveOrReserveWorkSegment(5);
+
+ assertTrue(aware.lock(5));
IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5));
@@ -521,6 +521,10 @@ public class SegmentAwareTest {
//given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, false);
+ // Set limits.
+ aware.curAbsWalIdx(10);
+ aware.minReserveIndex(0);
+
//when: reserve one segment twice and one segment once.
aware.reserve(5);
aware.reserve(5);
@@ -586,8 +590,8 @@ public class SegmentAwareTest {
SegmentAware aware = new SegmentAware(10, false);
//when: lock one segment twice.
- aware.checkCanReadArchiveOrReserveWorkSegment(5);
- aware.checkCanReadArchiveOrReserveWorkSegment(5);
+ assertTrue(aware.lock(5));
+ assertTrue(aware.lock(5));
//then: exact one segment should locked.
assertTrue(aware.locked(5));
@@ -595,7 +599,7 @@ public class SegmentAwareTest {
assertFalse(aware.locked(4));
//when: release segment once.
- aware.releaseWorkSegment(5);
+ aware.unlock(5);
//then: nothing to change, segment still locked.
assertTrue(aware.locked(5));
@@ -603,7 +607,7 @@ public class SegmentAwareTest {
assertFalse(aware.locked(4));
//when: release segment.
- aware.releaseWorkSegment(5);
+ aware.unlock(5);
//then: all segments should be unlocked.
assertFalse(aware.locked(5));
@@ -619,10 +623,9 @@ public class SegmentAwareTest {
//given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, false);
- aware.checkCanReadArchiveOrReserveWorkSegment(5);
+ assertTrue(aware.lock(5));
try {
-
- aware.releaseWorkSegment(7);
+ aware.unlock(7);
}
catch (AssertionError e) {
return;
@@ -632,6 +635,50 @@ public class SegmentAwareTest {
}
/**
+ * Check that the reservation border is working correctly.
+ */
+ @Test
+ public void testReservationBorder() {
+ SegmentAware aware = new SegmentAware(10, false);
+
+ assertTrue(aware.reserve(0));
+ assertTrue(aware.reserve(1));
+
+ assertFalse(aware.minReserveIndex(0));
+ assertFalse(aware.minReserveIndex(1));
+
+ aware.release(0);
+
+ assertTrue(aware.minReserveIndex(0));
+ assertFalse(aware.minReserveIndex(1));
+
+ assertFalse(aware.reserve(0));
+ assertTrue(aware.reserve(1));
+ }
+
+ /**
+ * Check that the lock border is working correctly.
+ */
+ @Test
+ public void testLockBorder() {
+ SegmentAware aware = new SegmentAware(10, false);
+
+ assertTrue(aware.lock(0));
+ assertTrue(aware.lock(1));
+
+ assertFalse(aware.minLockIndex(0));
+ assertFalse(aware.minLockIndex(1));
+
+ aware.unlock(0);
+
+ assertTrue(aware.minLockIndex(0));
+ assertFalse(aware.minLockIndex(1));
+
+ assertFalse(aware.lock(0));
+ assertTrue(aware.lock(1));
+ }
+
+ /**
* Assert that future is still not finished.
*
* @param future Future to check.