You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/07/13 14:39:30 UTC
[ignite] branch master updated: IGNITE-14952 Cancelling WAL
segments reservation when max WAL archive size is reached - Fixes 9198
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b2c4aff IGNITE-14952 Cancelling WAL segments reservation when max WAL archive size is reached - Fixes 9198
b2c4aff is described below
commit b2c4affb8275b62f71ff65f83807b2da1bf9998b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Tue Jul 13 17:38:23 2021 +0300
IGNITE-14952 Cancelling WAL segments reservation when max WAL archive size is reached - Fixes 9198
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 4 +
.../persistence/wal/FileWriteAheadLogManager.java | 66 +++--
.../wal/aware/SegmentArchiveSizeStorage.java | 161 +++++++++--
.../cache/persistence/wal/aware/SegmentAware.java | 48 ++--
.../wal/aware/SegmentReservationStorage.java | 37 ++-
.../persistence/wal/aware/SegmentAwareTest.java | 317 ++++++++++++++++++---
6 files changed, 517 insertions(+), 116 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index eafd228..044a79a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.pagemem.wal;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
@@ -133,7 +134,10 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
/**
* Invoke this method to reserve WAL history since provided pointer and prevent it's deletion.
*
+ * NOTE: If the {@link DataStorageConfiguration#getMaxWalArchiveSize()} is exceeded, the segment will be released.
+ *
* @param start WAL pointer.
+ * @return {@code True} if the reservation was successful.
*/
public boolean reserve(WALPointer start);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 673accd..33ca3c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -501,7 +501,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
});
}
- segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled(), log);
+ segmentAware = new SegmentAware(
+ log,
+ dsCfg.getWalSegments(),
+ dsCfg.isWalCompactionEnabled(),
+ minWalArchiveSize,
+ maxWalArchiveSize
+ );
// We have to initialize compressor before archiver in order to setup already compressed segments.
// Otherwise, FileArchiver initialization will trigger redundant work for FileCompressor.
@@ -568,7 +574,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentAware.reset();
segmentAware.resetWalArchiveSizes();
- segmentAware.addCurrentWalArchiveSize(totalSize(walArchiveFiles()));
+
+ for (FileDescriptor descriptor : walArchiveFiles())
+ segmentAware.addSize(descriptor.idx, descriptor.file.length());
if (isArchiverEnabled()) {
assert archiver != null : "FileArchiver should be initialized.";
@@ -1101,8 +1109,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
else {
deleted++;
- segmentSize.remove(desc.idx());
- segmentAware.addCurrentWalArchiveSize(-len);
+ long idx = desc.idx();
+
+ segmentSize.remove(idx);
+ segmentAware.addSize(idx, -len);
}
// Bump up the oldest archive segment index.
@@ -1307,8 +1317,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset());
}
+ long idx = cur.getSegmentId() + 1;
+ long currSize = 0;
+ long reservedSize = maxWalSegmentSize;
+
if (archiver == null)
- segmentAware.addReservedWalArchiveSize(maxWalSegmentSize);
+ segmentAware.addSize(idx, reservedSize);
FileWriteHandle next;
try {
@@ -1328,14 +1342,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
assert ptr != null;
}
- segmentSize.put(next.getSegmentId(), maxWalSegmentSize);
-
- if (archiver == null)
- segmentAware.addCurrentWalArchiveSize(maxWalSegmentSize);
+ currSize = reservedSize;
+ segmentSize.put(idx, currSize);
}
finally {
if (archiver == null)
- segmentAware.addReservedWalArchiveSize(-maxWalSegmentSize);
+ segmentAware.addSize(idx, currSize - reservedSize);
}
if (next.getSegmentId() - lastCheckpointPtr.index() >= maxSegCountWithoutCheckpoint)
@@ -2025,8 +2037,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
long offs = switchSegmentRecordOffset.getAndSet((int)segIdx, 0);
long origLen = origFile.length();
+ long currSize = 0;
long reservedSize = offs > 0 && offs < origLen ? offs : origLen;
- segmentAware.addReservedWalArchiveSize(reservedSize);
+
+ segmentAware.addSize(absIdx, reservedSize);
try {
if (offs > 0 && offs < origLen)
@@ -2045,8 +2059,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
- segmentSize.put(absIdx, dstFile.length());
- segmentAware.addCurrentWalArchiveSize(dstFile.length());
+ currSize = dstFile.length();
+ segmentSize.put(absIdx, currSize);
}
catch (IOException e) {
deleteArchiveFiles(dstFile, dstTmpFile);
@@ -2056,7 +2070,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
}
finally {
- segmentAware.addReservedWalArchiveSize(-reservedSize);
+ segmentAware.addSize(absIdx, currSize - reservedSize);
}
if (log.isInfoEnabled()) {
@@ -2252,8 +2266,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
File raw = new File(walArchiveDir, segmentFileName);
+ long currSize = 0;
long reservedSize = raw.length();
- segmentAware.addReservedWalArchiveSize(reservedSize);
+
+ segmentAware.addSize(segIdx, reservedSize);
try {
deleteObsoleteRawSegments();
@@ -2269,12 +2285,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
f0.force();
}
- long zipLen = zip.length();
+ currSize = zip.length();
+ segmentSize.put(segIdx, currSize);
- segmentSize.put(segIdx, zipLen);
- segmentAware.addCurrentWalArchiveSize(zipLen);
-
- metrics.onWalSegmentCompressed(zipLen);
+ metrics.onWalSegmentCompressed(currSize);
segmentAware.onSegmentCompressed(segIdx);
@@ -2292,7 +2306,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentAware.onSegmentCompressed(segIdx);
}
finally {
- segmentAware.addReservedWalArchiveSize(-reservedSize);
+ segmentAware.addSize(segIdx, currSize - reservedSize);
}
}
catch (IgniteInterruptedCheckedException ignore) {
@@ -2403,7 +2417,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return;
if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
- segmentAware.addCurrentWalArchiveSize(-deleteArchiveFiles(desc.file));
+ segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file));
}
}
}
@@ -2458,8 +2472,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX);
File unzip = new File(walArchiveDir, segmentFileName);
+ long currSize = 0;
long reservedSize = U.uncompressedSize(zip);
- segmentAware.addReservedWalArchiveSize(reservedSize);
+
+ segmentAware.addSize(segmentToDecompress, reservedSize);
IgniteCheckedException ex = null;
@@ -2477,7 +2493,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Files.move(unzipTmp.toPath(), unzip.toPath());
- segmentAware.addCurrentWalArchiveSize(unzip.length());
+ currSize = unzip.length();
}
catch (IOException e) {
deleteArchiveFiles(unzipTmp);
@@ -2492,7 +2508,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
finally {
- segmentAware.addReservedWalArchiveSize(-reservedSize);
+ segmentAware.addSize(segmentToDecompress, currSize - reservedSize);
}
updateHeartbeat();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java
index 76d6022..82d1b15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java
@@ -17,65 +17,154 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
/**
* Storage WAL archive size.
* Allows to track the exceeding of the maximum archive size.
*/
class SegmentArchiveSizeStorage {
- /** Current WAL archive size in bytes. */
- private long curr;
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Current WAL archive size in bytes. Guarded by {@code this}. */
+ private long walArchiveSize;
+
+ /** Flag of interrupt waiting on this object. Guarded by {@code this}. */
+ private boolean interrupted;
+
+ /** Minimum size of the WAL archive in bytes. */
+ private final long minWalArchiveSize;
+
+ /** Maximum size of the WAL archive in bytes. */
+ private final long maxWalArchiveSize;
- /** Reserved WAL archive size in bytes. */
- private long reserved;
+ /** WAL archive size unlimited. */
+ private final boolean walArchiveUnlimited;
- /** Flag of interrupt waiting on this object. */
- private volatile boolean interrupted;
+ /**
+ * Segment sizes. Mapping: segment idx -> size in bytes. Guarded by {@code this}.
+ * {@code null} if {@link #walArchiveUnlimited} == {@code true}.
+ */
+ @Nullable private final Map<Long, Long> segmentSizes;
+
+ /**
+ * Segment reservations storage.
+ * {@code null} if {@link #walArchiveUnlimited} == {@code true}.
+ */
+ @Nullable private final SegmentReservationStorage reservationStorage;
/**
- * Adding current WAL archive size in bytes.
+ * Constructor.
*
- * @param size Size in bytes.
+ * @param minWalArchiveSize Minimum size of the WAL archive in bytes.
+ * @param maxWalArchiveSize Maximum size of the WAL archive in bytes
+ * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ * @param reservationStorage Segment reservations storage.
*/
- synchronized void addCurrentSize(long size) {
- curr += size;
+ public SegmentArchiveSizeStorage(
+ IgniteLogger log,
+ long minWalArchiveSize,
+ long maxWalArchiveSize,
+ SegmentReservationStorage reservationStorage
+ ) {
+ this.log = log;
- if (size > 0)
- notifyAll();
+ this.minWalArchiveSize = minWalArchiveSize;
+ this.maxWalArchiveSize = maxWalArchiveSize;
+
+ if (maxWalArchiveSize != UNLIMITED_WAL_ARCHIVE) {
+ walArchiveUnlimited = false;
+
+ segmentSizes = new TreeMap<>();
+ this.reservationStorage = reservationStorage;
+ }
+ else {
+ walArchiveUnlimited = true;
+
+ segmentSizes = null;
+ this.reservationStorage = null;
+ }
}
/**
- * Adding reserved WAL archive size in bytes.
- * Defines a hint to determine if the maximum size is exceeded before a new segment is archived.
+ * Adds or updates information about size of a WAL segment in archive.
*
- * @param size Size in bytes.
+ * @param idx Absolut segment index.
+ * @param sizeChange Segment size in bytes. Could be positive (if segment is added to the archive)
+ * or negative (e.g. when it is removed from the archive).
*/
- synchronized void addReservedSize(long size) {
- reserved += size;
+ void changeSize(long idx, long sizeChange) {
+ long releaseIdx = -1;
+ int releaseCnt = 0;
+
+ synchronized (this) {
+ walArchiveSize += sizeChange;
+
+ if (!walArchiveUnlimited) {
+ segmentSizes.compute(idx, (i, size) -> {
+ long res = (size == null ? 0 : size) + sizeChange;
+
+ return res == 0 ? null : res;
+ });
+ }
+
+ if (sizeChange > 0) {
+ if (!walArchiveUnlimited && walArchiveSize >= maxWalArchiveSize) {
+ long size = 0;
- if (size > 0)
- notifyAll();
+ for (Map.Entry<Long, Long> e : segmentSizes.entrySet()) {
+ releaseIdx = e.getKey();
+ releaseCnt++;
+
+ if (walArchiveSize - (size += e.getValue()) < minWalArchiveSize)
+ break;
+ }
+ }
+
+ notifyAll();
+ }
+ }
+
+ if (releaseIdx != -1) {
+ if (log.isInfoEnabled()) {
+ log.info("Maximum size of the WAL archive exceeded, the segments will be forcibly released [" +
+ "maxWalArchiveSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ", releasedSegmentCnt=" +
+ releaseCnt + ", lastReleasedSegmentIdx=" + releaseIdx + ']');
+ }
+
+ reservationStorage.forceRelease(releaseIdx);
+ }
}
/**
* Reset the current and reserved WAL archive sizes.
*/
synchronized void resetSizes() {
- curr = 0;
- reserved = 0;
+ walArchiveSize = 0;
+
+ if (!walArchiveUnlimited)
+ segmentSizes.clear();
}
/**
* Waiting for exceeding the maximum WAL archive size.
- * To track size of WAL archive, need to use {@link #addCurrentSize} and {@link #addReservedSize}.
+ * To track size of WAL archive, need to use {@link #changeSize}.
*
* @param max Maximum WAL archive size in bytes.
* @throws IgniteInterruptedCheckedException If it was interrupted.
*/
synchronized void awaitExceedMaxSize(long max) throws IgniteInterruptedCheckedException {
try {
- while (max - (curr + reserved) > 0 && !interrupted)
+ while (max - walArchiveSize > 0 && !interrupted)
wait();
}
catch (InterruptedException e) {
@@ -98,7 +187,31 @@ class SegmentArchiveSizeStorage {
/**
* Reset interrupted flag.
*/
- void reset() {
+ synchronized void reset() {
interrupted = false;
}
+
+ /**
+ * Getting current WAL archive size in bytes.
+ *
+ * @return Size in bytes.
+ */
+ synchronized long currentSize() {
+ return walArchiveSize;
+ }
+
+ /**
+ * Getting the size of the WAL segment of the archive in bytes.
+ *
+ * @return Size in bytes or {@code null} if the segment is absent or the archive is unlimited.
+ */
+ @Nullable Long segmentSize(long idx) {
+ if (walArchiveUnlimited)
+ return null;
+ else {
+ synchronized (this) {
+ return segmentSizes.get(idx);
+ }
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 48b16b0..ee43249 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
/**
@@ -48,17 +49,33 @@ public class SegmentAware {
/**
* Constructor.
*
+ * @param log Logger.
* @param walSegmentsCnt Total WAL segments count.
* @param compactionEnabled Is wal compaction enabled.
- * @param log Logger.
- */
- public SegmentAware(int walSegmentsCnt, boolean compactionEnabled, IgniteLogger log) {
+ * @param minWalArchiveSize Minimum size of the WAL archive in bytes
+ * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ * @param maxWalArchiveSize Maximum size of the WAL archive in bytes
+ * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ */
+ public SegmentAware(
+ IgniteLogger log,
+ int walSegmentsCnt,
+ boolean compactionEnabled,
+ long minWalArchiveSize,
+ long maxWalArchiveSize
+ ) {
segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage);
segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt);
segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled);
- archiveSizeStorage = new SegmentArchiveSizeStorage();
+ archiveSizeStorage = new SegmentArchiveSizeStorage(
+ log,
+ minWalArchiveSize,
+ maxWalArchiveSize,
+ reservationStorage
+ );
+
truncateStorage = new SegmentTruncateStorage();
segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived);
@@ -318,22 +335,13 @@ public class SegmentAware {
}
/**
- * Adding current WAL archive size in bytes.
- *
- * @param size Size in bytes.
- */
- public void addCurrentWalArchiveSize(long size) {
- archiveSizeStorage.addCurrentSize(size);
- }
-
- /**
- * Adding reserved WAL archive size in bytes.
- * Defines a hint to determine if the maximum size is exceeded before a new segment is archived.
+ * Adding the WAL segment size in the archive.
*
- * @param size Size in bytes.
+ * @param idx Absolut segment index.
+ * @param sizeChange Segment size in bytes.
*/
- public void addReservedWalArchiveSize(long size) {
- archiveSizeStorage.addReservedSize(size);
+ public void addSize(long idx, long sizeChange) {
+ archiveSizeStorage.changeSize(idx, sizeChange);
}
/**
@@ -344,8 +352,8 @@ public class SegmentAware {
}
/**
- * Waiting for exceeding the maximum WAL archive size. To track size of WAL archive,
- * need to use {@link #addCurrentWalArchiveSize} and {@link #addReservedWalArchiveSize}.
+ * Waiting for exceeding the maximum WAL archive size.
+ * To track size of WAL archive, need to use {@link #addSize}.
*
* @param max Maximum WAL archive size in bytes.
* @throws IgniteInterruptedCheckedException If it was interrupted.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index 1f0979a..ee81bff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -28,12 +28,12 @@ import org.jetbrains.annotations.Nullable;
*/
class SegmentReservationStorage extends SegmentObservable {
/**
- * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has
- * index >= reserved segment index. Guarded by {@code this}.
+ * Maps absolute segment index to reservation counter. Guarded by {@code this}.
+ * If counter > 0 then we wouldn't delete all segments which has index >= reserved segment index.
*/
private final NavigableMap<Long, Integer> reserved = new TreeMap<>();
- /** Maximum segment index that can be reserved. */
+ /** Maximum segment index that can be reserved. Guarded by {@code this}. */
private long minReserveIdx = -1;
/**
@@ -71,6 +71,8 @@ class SegmentReservationStorage extends SegmentObservable {
}
/**
+ * Segment release.
+ *
* @param absIdx Reserved index.
*/
void release(long absIdx) {
@@ -122,4 +124,33 @@ class SegmentReservationStorage extends SegmentObservable {
return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin;
}
+
+ /**
+ * Forces the release of reserved segments.
+ * Also increases minimum segment index that can be reserved.
+ *
+ * @param absIdx Absolute segment index up (and including) to which the
+ * segments will be released, and it will also not be possible to reserve segments.
+ */
+ void forceRelease(long absIdx) {
+ Long minReservedIdx;
+
+ synchronized (this) {
+ minReservedIdx = trackingMinReservedIdx(reserved -> reserved.headMap(absIdx, true).clear());
+
+ minReserveIdx = Math.max(minReserveIdx, absIdx);
+ }
+
+ if (minReservedIdx != null)
+ notifyObservers(minReservedIdx);
+ }
+
+ /**
+ * Getting maximum segment index that can be reserved.
+ *
+ * @return Absolute segment index.
+ */
+ synchronized long minReserveIdx() {
+ return minReserveIdx;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 08a3741..1652d35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -18,16 +18,20 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.logger.NullLogger;
-import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -44,12 +48,12 @@ public class SegmentAwareTest {
*/
@Test
public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
int iterationCnt = 100_000;
int segmentToHandle = 1;
- IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> {
+ IgniteInternalFuture archiverThread = runAsync(() -> {
int i = iterationCnt;
while (i-- > 0) {
@@ -62,7 +66,7 @@ public class SegmentAwareTest {
}
});
- IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> {
+ IgniteInternalFuture lockerThread = runAsync(() -> {
int i = iterationCnt;
while (i-- > 0) {
@@ -81,7 +85,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -98,7 +102,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -115,7 +119,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -138,7 +142,7 @@ public class SegmentAwareTest {
@Test
public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
@@ -155,7 +159,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -175,7 +179,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -195,7 +199,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -215,7 +219,7 @@ public class SegmentAwareTest {
@Test
public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.curAbsWalIdx(5);
@@ -232,7 +236,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false, new NullLogger());
+ SegmentAware aware = segmentAware(2);
aware.curAbsWalIdx(1);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -252,7 +256,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false, new NullLogger());
+ SegmentAware aware = segmentAware(2);
aware.curAbsWalIdx(1);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -272,7 +276,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(2, false, new NullLogger());
+ SegmentAware aware = segmentAware(2);
aware.curAbsWalIdx(2);
aware.setLastArchivedAbsoluteIndex(-1);
@@ -298,7 +302,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -315,7 +319,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -332,7 +336,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -349,7 +353,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5));
@@ -366,7 +370,7 @@ public class SegmentAwareTest {
@Test
public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.curAbsWalIdx(5);
aware.setLastArchivedAbsoluteIndex(4);
@@ -386,7 +390,7 @@ public class SegmentAwareTest {
@Test
public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
assertTrue(aware.lock(5));
@@ -405,7 +409,7 @@ public class SegmentAwareTest {
@Test
public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
assertTrue(aware.lock(5));
@@ -427,7 +431,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true, new NullLogger());
+ SegmentAware aware = segmentAware(10, true);
aware.onSegmentCompressed(5);
@@ -446,7 +450,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true, new NullLogger());
+ SegmentAware aware = segmentAware(10, true);
aware.onSegmentCompressed(5);
@@ -464,7 +468,7 @@ public class SegmentAwareTest {
*/
@Test
public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException {
- SegmentAware aware = new SegmentAware(10, true, new NullLogger());
+ SegmentAware aware = segmentAware(10, true);
aware.setLastArchivedAbsoluteIndex(6);
@@ -478,7 +482,7 @@ public class SegmentAwareTest {
@Test
public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, true, new NullLogger());
+ SegmentAware aware = segmentAware(10, true);
aware.onSegmentCompressed(5);
IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress);
@@ -495,7 +499,7 @@ public class SegmentAwareTest {
*/
@Test
public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
- SegmentAware aware = new SegmentAware(10, true, new NullLogger());
+ SegmentAware aware = segmentAware(10, true);
for (int i = 0; i < 5; i++) {
aware.setLastArchivedAbsoluteIndex(i);
@@ -520,7 +524,7 @@ public class SegmentAwareTest {
@Test
public void testReserveCorrectly() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
// Set limits.
aware.curAbsWalIdx(10);
@@ -563,15 +567,14 @@ public class SegmentAwareTest {
}
/**
- * Shouldn't fail when release unreserved segment.
+ * Check that there will be no error if a non-reserved segment is released.
*/
@Test
public void testReleaseUnreservedSegment() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
aware.reserve(5);
-
aware.release(7);
}
@@ -581,7 +584,7 @@ public class SegmentAwareTest {
@Test
public void testReserveWorkSegmentCorrectly() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
//when: lock one segment twice.
assertTrue(aware.lock(5));
@@ -615,7 +618,7 @@ public class SegmentAwareTest {
@Test
public void testAssertFail_WhenReleaseUnreservedWorkSegment() {
//given: thread which awaited segment.
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
assertTrue(aware.lock(5));
try {
@@ -633,7 +636,7 @@ public class SegmentAwareTest {
*/
@Test
public void testReservationBorder() {
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
assertTrue(aware.reserve(0));
assertTrue(aware.reserve(1));
@@ -655,7 +658,7 @@ public class SegmentAwareTest {
*/
@Test
public void testLockBorder() {
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
assertTrue(aware.lock(0));
assertTrue(aware.lock(1));
@@ -679,30 +682,30 @@ public class SegmentAwareTest {
*/
@Test
public void testWalArchiveSize() throws Exception {
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture<?> fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
- aware.addCurrentWalArchiveSize(4);
+ aware.addSize(0, 4);
assertFutureIsNotFinish(fut);
- aware.addReservedWalArchiveSize(4);
+ aware.addSize(0, 4);
assertFutureIsNotFinish(fut);
- aware.addCurrentWalArchiveSize(4);
+ aware.addSize(0, 4);
fut.get(20);
aware.resetWalArchiveSizes();
fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10));
- aware.addCurrentWalArchiveSize(4);
+ aware.addSize(1, 4);
assertFutureIsNotFinish(fut);
- aware.addReservedWalArchiveSize(4);
+ aware.addSize(1, 4);
assertFutureIsNotFinish(fut);
- aware.addReservedWalArchiveSize(4);
+ aware.addSize(1, 4);
fut.get(20);
aware.resetWalArchiveSizes();
@@ -727,7 +730,7 @@ public class SegmentAwareTest {
*/
@Test
public void testTruncate() throws Exception {
- SegmentAware aware = new SegmentAware(10, false, new NullLogger());
+ SegmentAware aware = segmentAware(10);
IgniteInternalFuture<?> fut = awaitThread(aware::awaitAvailableTruncateArchive);
@@ -768,6 +771,165 @@ public class SegmentAwareTest {
}
/**
+ * Checking the correct calculation of the WAL archive size for an unlimited WAL archive.
+ */
+ @Test
+ public void testArchiveSizeForUnlimitedWalArchive() {
+ SegmentAware aware = segmentAware(1, false, 0, UNLIMITED_WAL_ARCHIVE);
+ SegmentArchiveSizeStorage sizeStorage = archiveSizeStorage(aware);
+
+ aware.addSize(0, 10);
+
+ assertEquals(10, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+
+ aware.addSize(0, 20);
+
+ assertEquals(30, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+
+ aware.addSize(1, 10);
+
+ assertEquals(40, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertNull(sizeStorage.segmentSize(1));
+
+ aware.addSize(0, -10);
+
+ assertEquals(30, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertNull(sizeStorage.segmentSize(1));
+
+ aware.addSize(1, -10);
+
+ assertEquals(20, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertNull(sizeStorage.segmentSize(1));
+
+ aware.addSize(0, -20);
+
+ assertEquals(0, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertNull(sizeStorage.segmentSize(1));
+ }
+
+ /**
+ * Checking the correct calculation of the WAL archive size for a limited WAL archive.
+ */
+ @Test
+ public void testArchiveSizeForLimitedWalArchive() {
+ SegmentAware aware = segmentAware(1, false, 100, 200);
+ SegmentArchiveSizeStorage sizeStorage = archiveSizeStorage(aware);
+
+ aware.addSize(0, 10);
+
+ assertEquals(10, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(10), sizeStorage.segmentSize(0));
+
+ aware.addSize(0, 20);
+
+ assertEquals(30, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(30), sizeStorage.segmentSize(0));
+
+ aware.addSize(1, 5);
+
+ assertEquals(35, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(30), sizeStorage.segmentSize(0));
+ assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1));
+
+ aware.addSize(0, -5);
+
+ assertEquals(30, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(25), sizeStorage.segmentSize(0));
+ assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1));
+
+ aware.addSize(0, -10);
+
+ assertEquals(20, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(15), sizeStorage.segmentSize(0));
+ assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1));
+
+ aware.addSize(1, -3);
+
+ assertEquals(17, sizeStorage.currentSize());
+ assertEquals(Long.valueOf(15), sizeStorage.segmentSize(0));
+ assertEquals(Long.valueOf(2), sizeStorage.segmentSize(1));
+
+ aware.addSize(0, -15);
+
+ assertEquals(2, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertEquals(Long.valueOf(2), sizeStorage.segmentSize(1));
+
+ aware.addSize(1, -2);
+
+ assertEquals(0, sizeStorage.currentSize());
+ assertNull(sizeStorage.segmentSize(0));
+ assertNull(sizeStorage.segmentSize(1));
+ }
+
+ /**
+ * Checking that when the {@code SegmentArchiveSizeStorage#maxWalArchiveSize} is reached,
+ * the segments will be released to the {@code SegmentArchiveSizeStorage#minWalArchiveSize},
+ * and it will also not be possible to reserve them.
+ */
+ @Test
+ public void testReleaseSegmentsOnExceedMaxWalArchiveSize() {
+ SegmentAware aware = segmentAware(1, false, 50, 100);
+ SegmentReservationStorage reservationStorage = reservationStorage(aware);
+
+ for (int i = 0; i < 9; i++)
+ aware.addSize(i, 10);
+
+ assertTrue(aware.reserve(0));
+ assertTrue(aware.reserve(1));
+ assertTrue(aware.reserve(8));
+
+ aware.addSize(9, 10);
+
+ assertFalse(aware.reserved(0));
+ assertFalse(aware.reserved(1));
+ assertTrue(aware.reserved(8));
+
+ assertEquals(5, reservationStorage.minReserveIdx());
+
+ for (int i = 0; i <= 5; i++) {
+ assertFalse(aware.reserve(i));
+ assertFalse(aware.reserved(i));
+
+ assertTrue(aware.minReserveIndex(i));
+ }
+
+ for (int i = 6; i < 10; i++) {
+ assertTrue(aware.reserve(i));
+
+ assertFalse(aware.minReserveIndex(i));
+ }
+ }
+
+ /**
+ * Check that if the size of the segments does not reach the {@code SegmentArchiveSizeStorage#maxWalArchiveSize}
+ * then there will be no release of the segments.
+ */
+ @Test
+ public void testNoReleaseSegmentNearMaxWalArchiveSize() {
+ SegmentAware aware = segmentAware(1, false, 50, 100);
+
+ for (int i = 0; i < 9; i++)
+ aware.addSize(i, 10);
+
+ assertTrue(aware.reserve(0));
+ assertTrue(aware.reserve(1));
+ assertTrue(aware.reserve(8));
+
+ aware.addSize(9, 9);
+
+ assertTrue(aware.reserve(0));
+ assertTrue(aware.reserve(1));
+ assertTrue(aware.reserve(8));
+ }
+
+ /**
* Assert that future is still not finished.
*
* @param future Future to check.
@@ -791,7 +953,7 @@ public class SegmentAwareTest {
*/
private IgniteInternalFuture awaitThread(Waiter waiter) throws IgniteCheckedException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
- IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
+ IgniteInternalFuture<Object> future = runAsync(
() -> {
latch.countDown();
try {
@@ -821,4 +983,71 @@ public class SegmentAwareTest {
*/
void await() throws IgniteInterruptedCheckedException;
}
+
+ /**
+ * Factory method for the {@link SegmentAware}.
+ *
+ * @param walSegmentsCnt Total WAL segments count.
+ * @return New instance.
+ */
+ private SegmentAware segmentAware(int walSegmentsCnt) {
+ return segmentAware(walSegmentsCnt, false);
+ }
+
+ /**
+ * Factory method for the {@link SegmentAware}.
+ *
+ * @param walSegmentsCnt Total WAL segments count.
+ * @param compactionEnabled Is wal compaction enabled.
+ * @return New instance.
+ */
+ private SegmentAware segmentAware(int walSegmentsCnt, boolean compactionEnabled) {
+ return segmentAware(walSegmentsCnt, compactionEnabled, UNLIMITED_WAL_ARCHIVE, UNLIMITED_WAL_ARCHIVE);
+ }
+
+ /**
+ * Factory method for the {@link SegmentAware}.
+ *
+ * @param walSegmentsCnt Total WAL segments count.
+ * @param compactionEnabled Is wal compaction enabled.
+ * @param minWalArchiveSize Minimum size of the WAL archive in bytes
+ * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ * @param maxWalArchiveSize Maximum size of the WAL archive in bytes
+ * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
+ * @return New instance.
+ */
+ private SegmentAware segmentAware(
+ int walSegmentsCnt,
+ boolean compactionEnabled,
+ long minWalArchiveSize,
+ long maxWalArchiveSize
+ ) {
+ return new SegmentAware(
+ new NullLogger(),
+ walSegmentsCnt,
+ compactionEnabled,
+ minWalArchiveSize,
+ maxWalArchiveSize
+ );
+ }
+
+ /**
+ * Getting {@code SegmentAware#archiveSizeStorage}.
+ *
+ * @param aware Segment aware.
+ * @return Instance of {@link SegmentArchiveSizeStorage}.
+ */
+ private SegmentArchiveSizeStorage archiveSizeStorage(SegmentAware aware) {
+ return getFieldValue(aware, "archiveSizeStorage");
+ }
+
+ /**
+ * Getting {@code SegmentAware#reservationStorage}.
+ *
+ * @param aware Segment aware.
+ * @return Instance of {@link SegmentReservationStorage}.
+ */
+ private SegmentReservationStorage reservationStorage(SegmentAware aware) {
+ return getFieldValue(aware, "reservationStorage");
+ }
}