You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/07/09 10:07:37 UTC
[ignite] branch master updated: IGNITE-15012 Adaptation of the
historical rebalance to the release of WAL segments. Fixes #9206
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cb02f4d IGNITE-15012 Adaptation of the historical rebalance to the release of WAL segments. Fixes #9206
cb02f4d is described below
commit cb02f4dbb8d6efcde2917021c0016e6c29515a60
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jul 9 13:06:35 2021 +0300
IGNITE-15012 Adaptation of the historical rebalance to the release of WAL segments. Fixes #9206
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 3 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 26 +-
.../GridCacheDatabaseSharedManager.java | 139 ++++---
.../persistence/checkpoint/CheckpointHistory.java | 2 +-
.../checkpoint/CheckpointHistoryResult.java | 25 +-
.../wal/AbstractWalRecordsIterator.java | 3 +
.../wal/aware/SegmentReservationStorage.java | 13 +-
.../ReleaseSegmentOnHistoricalRebalanceTest.java | 408 +++++++++++++++++++++
.../IgniteWalIteratorExceptionDuringReadTest.java | 35 +-
.../db/wal/WalRecoveryTxLogicalRecordsTest.java | 24 +-
.../cache/persistence/pagemem/NoOpWALManager.java | 2 +-
.../persistence/wal/aware/SegmentAwareTest.java | 13 +-
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
13 files changed, 560 insertions(+), 135 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index a4ed599..eafd228 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -141,9 +141,8 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
* Invoke this method to release WAL history since provided pointer that was previously reserved.
*
* @param start WAL pointer.
- * @throws IgniteException If failed to release.
*/
- public void release(WALPointer start) throws IgniteCheckedException;
+ public void release(WALPointer start);
/**
* Gives a hint to WAL manager to clear entries logged before the given pointer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f1ec405..4b683fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -321,16 +321,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** */
@GridToStringExclude
- private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
+ private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
/** Set of nodes that cannot be used for wal rebalancing due to some reason. */
- private Set<UUID> exclusionsFromHistoricalRebalance = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<UUID> exclusionsFromHistoricalRebalance = ConcurrentHashMap.newKeySet();
/**
* Set of nodes that cannot be used for full rebalancing due missed partitions.
* Mapping pair of groupId and nodeId to set of partitions.
*/
- private Map<T2<Integer, UUID>, Set<Integer>> exclusionsFromFullRebalance = new ConcurrentHashMap<>();
+ private final Map<T2<Integer, UUID>, Set<Integer>> exclusionsFromFullRebalance = new ConcurrentHashMap<>();
/** Reserved max available history for calculation of history supplier on coordinator. */
private volatile Map<Integer /** Group. */, Map<Integer /** Partition */, Long /** Counter. */>> partHistReserved;
@@ -576,11 +576,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param cntrSince Partition update counter since history supplying is requested.
* @return List of IDs of history supplier nodes or empty list if these doesn't exist.
*/
- @Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
+ public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince);
- return histSuppliers.stream().filter((supplier) -> !exclusionsFromHistoricalRebalance.contains(supplier))
- .collect(Collectors.toList());
+ histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);
+
+ return histSuppliers;
}
/**
@@ -601,8 +602,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param p Partition id.
*/
public void markNodeAsInapplicableForFullRebalance(UUID nodeId, int grpId, int p) {
- Set<Integer> parts = exclusionsFromFullRebalance.computeIfAbsent(new T2<>(grpId, nodeId), t2 ->
- Collections.newSetFromMap(new ConcurrentHashMap<>())
+ Set<Integer> parts = exclusionsFromFullRebalance.computeIfAbsent(
+ new T2<>(grpId, nodeId), t2 -> ConcurrentHashMap.newKeySet()
);
parts.add(p);
@@ -2416,8 +2417,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
", wasRebalanced=" + wasRebalanced() + ']');
}
- assert res != null || err != null;
-
if (res != null) {
span.addTag(SpanTags.tag(SpanTags.RESULT, SpanTags.TOPOLOGY_VERSION, SpanTags.MAJOR),
() -> String.valueOf(res.topologyVersion()));
@@ -2521,9 +2520,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (localReserved != null) {
boolean success = cctx.database().reserveHistoryForPreloading(localReserved);
- // TODO: how to handle?
- if (!success)
- err = new IgniteCheckedException("Could not reserve history");
+ if (!success) {
+ log.warning("Could not reserve history for historical rebalance " +
+ "(possible it happened because WAL space is exhausted).");
+ }
}
cctx.database().releaseHistoryForExchange();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 5544244..4d08196 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -41,7 +41,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
@@ -306,11 +305,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Lock wait time. */
private final long lockWaitTime;
- /** This is the earliest WAL pointer that was reserved during exchange and would release after exchange completed. */
- private WALPointer reservedForExchange;
+ /**
+ * This is the earliest WAL pointer that was reserved during exchange and would release after exchange completed.
+ * Guarded by {@code this}.
+ */
+ @Nullable private WALPointer reservedForExchange;
/** This is the earliest WAL pointer that was reserved during preloading. */
- private volatile WALPointer reservedForPreloading;
+ private final AtomicReference<WALPointer> reservedForPreloading = new AtomicReference<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
@@ -343,9 +345,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Page list cache limits per data region. */
private final Map<String, AtomicLong> pageListCacheLimits = new ConcurrentHashMap<>();
- /** Lock for releasing history for preloading. */
- private final ReentrantLock releaseHistForPreloadingLock = new ReentrantLock();
-
/** */
private CachePartitionDefragmentationManager defrgMgr;
@@ -1629,14 +1628,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
+ WALPointer reservedCheckpointMark;
+
try {
CheckpointHistoryResult checkpointHistoryResult =
checkpointHistory().searchAndReserveCheckpoints(applicableGroupsAndPartitions);
earliestValidCheckpoints = checkpointHistoryResult.earliestValidCheckpoints();
- if (checkpointHistoryResult.reservedCheckoint() != null)
- reservedForExchange = checkpointHistoryResult.reservedCheckoint().checkpointMark();
+ reservedForExchange = reservedCheckpointMark = checkpointHistoryResult.reservedCheckpointMark();
}
finally {
checkpointReadUnlock();
@@ -1656,8 +1656,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int partId = e0.getKey();
- assert cctx.wal().reserved(cpEntry.checkpointMark())
- : "WAL segment for checkpoint " + cpEntry + " has not reserved";
+ if (reservedCheckpointMark != null && !cctx.wal().reserved(reservedCheckpointMark)) {
+ log.warning("Reservation failed because the segment was released: " + reservedCheckpointMark);
+
+ reservedForExchange = null;
+
+ grpPartsWithCnts.clear();
+
+ return grpPartsWithCnts;
+ }
try {
Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
@@ -1752,20 +1759,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
- if (reservedForExchange == null)
- return;
-
- assert cctx.wal().reserved(reservedForExchange)
- : "Earliest checkpoint WAL pointer is not reserved for exchange: " + reservedForExchange;
-
- try {
+ if (reservedForExchange != null) {
cctx.wal().release(reservedForExchange);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to release earliest checkpoint WAL pointer: " + reservedForExchange, e);
- }
- reservedForExchange = null;
+ reservedForExchange = null;
+ }
}
/** {@inheritDoc} */
@@ -1777,8 +1775,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
WALPointer oldestWALPointerToReserve = null;
- for (GroupPartitionId key : entries.keySet()) {
- WALPointer ptr = entries.get(key).checkpointMark();
+ for (CheckpointEntry cpE : entries.values()) {
+ WALPointer ptr = cpE.checkpointMark();
if (ptr == null)
return false;
@@ -1788,7 +1786,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
if (cctx.wal().reserve(oldestWALPointerToReserve)) {
- reservedForPreloading = oldestWALPointerToReserve;
+ reservedForPreloading.set(oldestWALPointerToReserve);
return true;
}
@@ -1798,28 +1796,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
- releaseHistForPreloadingLock.lock();
-
- try {
- if (reservedForPreloading != null) {
- cctx.wal().release(reservedForPreloading);
+ WALPointer prev = reservedForPreloading.getAndSet(null);
- reservedForPreloading = null;
- }
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Could not release WAL reservation", ex);
-
- throw new IgniteException(ex);
- }
- finally {
- releaseHistForPreloadingLock.unlock();
- }
+ if (prev != null)
+ cctx.wal().release(prev);
}
/** {@inheritDoc} */
@Override public WALPointer latestWalPointerReservedForPreloading() {
- return reservedForPreloading;
+ return reservedForPreloading.get();
}
/**
@@ -2151,7 +2136,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
AtomicLong applied = new AtomicLong();
try {
- while (it.hasNextX()) {
+ while (restoreBinaryState.hasNext()) {
if (applyError.get() != null)
break;
@@ -2642,7 +2627,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final IgniteTxManager txManager = cctx.tm();
try {
- while (it.hasNextX()) {
+ while (restoreLogicalState.hasNext()) {
WALRecord rec = restoreLogicalState.next();
if (rec == null)
@@ -3425,9 +3410,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
- public WALRecord next() throws IgniteCheckedException {
+ @Nullable public WALRecord next() throws IgniteCheckedException {
try {
- for (;;) {
+ for (; ; ) {
if (!iterator.hasNextX())
return null;
@@ -3444,7 +3429,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Filter out records by group id.
if (rec instanceof WalRecordCacheGroupAware) {
- WalRecordCacheGroupAware grpAwareRecord = (WalRecordCacheGroupAware) rec;
+ WalRecordCacheGroupAware grpAwareRecord = (WalRecordCacheGroupAware)rec;
if (!cacheGroupPredicate.apply(grpAwareRecord.groupId()))
continue;
@@ -3452,24 +3437,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
// Filter out data entries by group id.
if (rec instanceof DataRecord)
- rec = filterEntriesByGroupId((DataRecord) rec);
+ rec = filterEntriesByGroupId((DataRecord)rec);
return rec;
}
}
catch (IgniteCheckedException e) {
- boolean throwsCRCError = throwsCRCError();
-
- if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) {
- if (throwsCRCError)
- throw e;
- else
- return null;
- }
+ IgniteCheckedException ex = throwsError(e);
- log.error("There is an error during restore state [throwsCRCError=" + throwsCRCError + ']', e);
-
- throw e;
+ if (ex != null)
+ throw ex;
+ else
+ return null;
}
}
@@ -3509,6 +3488,44 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
public boolean throwsCRCError() {
return lastReadRecordPointer().index() <= lastArchivedSegment;
}
+
+ /**
+ * Checks for more WALRecord entries.
+ *
+ * @return {@code True} if contains more WALRecord entries.
+ * @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
+ */
+ public boolean hasNext() throws IgniteCheckedException {
+ try {
+ return iterator.hasNextX();
+ }
+ catch (IgniteCheckedException e) {
+ IgniteCheckedException ex = throwsError(e);
+
+ if (ex != null)
+ throw ex;
+ else
+ return false;
+ }
+ }
+
+ /**
+ * Checks the need to throw an exception.
+ *
+ * @param e Thrown exception.
+ * @return Exception to be thrown.
+ */
+ @Nullable private IgniteCheckedException throwsError(IgniteCheckedException e) {
+ boolean throwsCRCError = throwsCRCError();
+
+ if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
+ return throwsCRCError ? e : null;
+ else {
+ log.error("There is an error during restore state [throwsCRCError=" + throwsCRCError + ']', e);
+
+ return e;
+ }
+ }
}
/**
@@ -3542,7 +3559,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
- @Override public WALRecord next() throws IgniteCheckedException {
+ @Override @Nullable public WALRecord next() throws IgniteCheckedException {
WALRecord rec = super.next();
if (rec == null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 784dd77..aa77a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -621,7 +621,7 @@ public class CheckpointHistory {
* @param searchCntrMap Search map contains (Group Id, partition, counter).
* @return Map of group-partition on checkpoint entry or empty map if nothing found.
*/
- @Nullable public Map<GroupPartitionId, CheckpointEntry> searchCheckpointEntry(
+ public Map<GroupPartitionId, CheckpointEntry> searchCheckpointEntry(
Map<T2<Integer, Integer>, Long> searchCntrMap
) {
if (F.isEmpty(searchCntrMap))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistoryResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistoryResult.java
index f9bcdef..e0fbbae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistoryResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistoryResult.java
@@ -15,38 +15,39 @@
* limitations under the License.
*/
-
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.util.Map;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
/**
* Result of a checkpoint search and reservation.
*/
public class CheckpointHistoryResult {
-
/**
- * Map (groupId, Reason why reservation cannot be made deeper): Map (partitionId, earliest valid checkpoint to
- * history search)).
+ * Map (groupId, Reason why reservation cannot be made deeper):
+ * Map (partitionId, earliest valid checkpoint to history search)).
*/
private final Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints;
/** Reserved checkpoint. */
- private final CheckpointEntry reservedCheckoint;
+ @Nullable private final CheckpointEntry reservedCheckpoint;
/**
* Constructor.
*
* @param earliestValidCheckpoints Map (groupId, Reason why reservation cannot be made deeper):
* Map (partitionId, earliest valid checkpoint to history search)).
- * @param reservedCheckoint Reserved checkpoint.
+ * @param reservedCheckpoint Reserved checkpoint.
*/
public CheckpointHistoryResult(
Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints,
- CheckpointEntry reservedCheckoint) {
+ @Nullable CheckpointEntry reservedCheckpoint
+ ) {
this.earliestValidCheckpoints = earliestValidCheckpoints;
- this.reservedCheckoint = reservedCheckoint;
+ this.reservedCheckpoint = reservedCheckpoint;
}
/**
@@ -58,9 +59,11 @@ public class CheckpointHistoryResult {
}
/**
- * @return Reserved checkpoint.
+ * Returns the oldest reserved checkpoint marker.
+ *
+ * @return Checkpoint mark.
*/
- public CheckpointEntry reservedCheckoint() {
- return reservedCheckoint;
+ @Nullable public WALPointer reservedCheckpointMark() {
+ return reservedCheckpoint == null ? null : reservedCheckpoint.checkpointMark();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 5bf7d39..0ff2694 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -143,6 +143,9 @@ public abstract class AbstractWalRecordsIterator
/** {@inheritDoc} */
@Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (curException != null)
+ throw curException;
+
return curRec != null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index 453cc17..1f0979a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -77,16 +77,9 @@ class SegmentReservationStorage extends SegmentObservable {
Long minReservedIdx;
synchronized (this) {
- minReservedIdx = trackingMinReservedIdx(reserved -> {
- Integer cur = reserved.get(absIdx);
-
- assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
-
- if (cur == 1)
- reserved.remove(absIdx);
- else
- reserved.put(absIdx, cur - 1);
- });
+ minReservedIdx = trackingMinReservedIdx(
+ reserved -> reserved.computeIfPresent(absIdx, (i, cnt) -> cnt == 1 ? null : cnt - 1)
+ );
}
if (minReservedIdx != null)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/ReleaseSegmentOnHistoricalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/ReleaseSegmentOnHistoricalRebalanceTest.java
new file mode 100644
index 0000000..cbd476b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/ReleaseSegmentOnHistoricalRebalanceTest.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.lang.reflect.Method;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistoryResult;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testing the release of WAL segments during historical rebalance.
+ */
+@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+public class ReleaseSegmentOnHistoricalRebalanceTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setWalSegmentSize((int)(2 * U.MB))
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ ).setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 2))
+ .setBackups(1)
+ );
+ }
+
+ /**
+ * Checks that if release the segment after {@link CheckpointHistory#searchAndReserveCheckpoints},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseSegmentAfterSearchAndReserveCheckpoints() throws Exception {
+ checkHistoricalRebalance(n -> {
+ CheckpointHistory spy = spy(dbMgr(n).checkpointHistory());
+
+ when(spy.searchAndReserveCheckpoints(any())).thenAnswer(m -> {
+ CheckpointHistoryResult res = (CheckpointHistoryResult)m.callRealMethod();
+
+ WALPointer reserved = res.reservedCheckpointMark();
+ assertNotNull(reserved);
+
+ release(n, reserved);
+
+ return res;
+ });
+
+ checkpointHistory(n, spy);
+ });
+ }
+
+ /**
+ * Checks that if release the segment before {@link GridCacheDatabaseSharedManager#releaseHistoryForExchange},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseBeforeReleaseHistoryForExchange() throws Exception {
+ checkHistoricalRebalance(n -> {
+ GridCacheDatabaseSharedManager spy = spy(dbMgr(n));
+
+ doAnswer(m -> {
+ release(n, getFieldValue(spy, "reservedForExchange"));
+
+ return m.callRealMethod();
+ }).when(spy).releaseHistoryForExchange();
+
+ databaseManager(n, spy);
+ });
+ }
+
+ /**
+ * Checks that if there is no segment reservation in {@link GridCacheDatabaseSharedManager#reserveHistoryForPreloading},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoReserveHistoryForPreloading() throws Exception {
+ checkHistoricalRebalance(n -> {
+ GridCacheDatabaseSharedManager spy = spy(dbMgr(n));
+
+ when(spy.reserveHistoryForPreloading(any())).thenAnswer(m -> false);
+
+ databaseManager(n, spy);
+ });
+ }
+
+ /**
+ * Checks that if release the segment before {@link GridCacheDatabaseSharedManager#releaseHistoryForPreloading},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseBeforeReleaseHistoryForPreloading() throws Exception {
+ checkHistoricalRebalance(n -> {
+ GridCacheDatabaseSharedManager spy = spy(dbMgr(n));
+
+ doAnswer(m -> {
+ release(n, spy.latestWalPointerReservedForPreloading());
+
+ return m.callRealMethod();
+ }).when(spy).releaseHistoryForPreloading();
+
+ databaseManager(n, spy);
+ });
+ }
+
+ /**
+ * Checks that if release the segment before {@link IgniteCacheOffheapManagerImpl#rebalanceIterator},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseBeforeRebalanceIterator() throws Exception {
+ checkHistoricalRebalance(n -> {
+ IgniteInternalCache<?, ?> cachex = n.cachex(DEFAULT_CACHE_NAME);
+
+ GridCacheOffheapManager spy = spy(offheapManager(cachex));
+
+ doAnswer(m -> {
+ CheckpointHistory cpHist = dbMgr(n).checkpointHistory();
+
+ for (Long cp : cpHist.checkpoints())
+ release(n, entry(cpHist, cp).checkpointMark());
+
+ return m.callRealMethod();
+ }).when(spy).rebalanceIterator(any(), any());
+
+ offheapManager(cachex, spy);
+ });
+ }
+
+ /**
+ * Checks that if release the segment during {@link IgniteCacheOffheapManagerImpl#rebalanceIterator},
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseDuringRebalanceIterator() throws Exception {
+ checkHistoricalRebalance(n -> {
+ IgniteInternalCache<?, ?> cachex = n.cachex(DEFAULT_CACHE_NAME);
+
+ GridCacheOffheapManager spy = spy(offheapManager(cachex));
+
+ doAnswer(m -> {
+ WALPointer reserved = dbMgr(n).latestWalPointerReservedForPreloading();
+
+ assertNotNull(reserved);
+
+ Object o = m.callRealMethod();
+
+ release(n, reserved);
+
+ assertTrue(segmentAware(n).minReserveIndex(Long.MAX_VALUE));
+
+ return o;
+ }).when(spy).rebalanceIterator(any(), any());
+
+ offheapManager(cachex, spy);
+ });
+ }
+
+ /**
+ * Checks that if the reservation is released immediately,
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testImmediateReleaseSegment() throws Exception {
+ checkHistoricalRebalance(n -> {
+ SegmentAware spy = spy(segmentAware(n));
+
+ doAnswer(m -> {
+ Object o = m.callRealMethod();
+
+ spy.release(m.getArgument(0));
+
+ return o;
+ }).when(spy).reserve(anyLong());
+
+ segmentAware(n, spy);
+ });
+ }
+
+ /**
+ * Checks that that if there is no reservation,
+ * there will be no errors and the rebalance will be completed.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoReserveSegment() throws Exception {
+ checkHistoricalRebalance(n -> {
+ SegmentAware spy = spy(segmentAware(n));
+
+ when(spy.reserve(anyLong())).thenAnswer(m -> false);
+
+ segmentAware(n, spy);
+ });
+ }
+
+ /**
+ * Populates the given cache and forces a new checkpoint every 100 updates.
+ *
+ * @param cache Cache.
+ * @param cnt Entry count.
+ * @param o Key offset.
+ * @throws Exception If failed.
+ */
+ private void populate(IgniteCache<Integer, ? super Object> cache, int cnt, int o) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ if (i % 100 == 0)
+ forceCheckpoint();
+
+ cache.put(i + o, new byte[64 * 1024]);
+ }
+ }
+
+ /**
+ * Sets the spy to {@code CheckpointMarkersStorage#cpHistory}.
+ *
+ * @param n Node.
+ * @param spy Spy.
+ */
+ private void checkpointHistory(IgniteEx n, CheckpointHistory spy) {
+ CheckpointMarkersStorage s = getFieldValue(dbMgr(n), "checkpointManager", "checkpointMarkersStorage");
+
+ setFieldValue(s, "cpHistory", spy);
+ }
+
+ /**
+ * Sets the spy to {@code GridCacheSharedContext#dbMgr}.
+ *
+ * @param n Node.
+ * @param spy Spy.
+ */
+ private void databaseManager(IgniteEx n, GridCacheDatabaseSharedManager spy) {
+ setFieldValue(n.context().cache().context(), "dbMgr", spy);
+ }
+
+ /**
+ * Sets the spy to {@code CacheGroupContext#offheapMgr}.
+ *
+ * @param cache Cache.
+ * @param spy Spy.
+ */
+ private void offheapManager(IgniteInternalCache<? ,?> cache, GridCacheOffheapManager spy) {
+ setFieldValue(cache.context().group(), "offheapMgr", spy);
+ }
+
+ /**
+ * Sets the spy to {@code FileWriteAheadLogManager#segmentAware}.
+ *
+ * @param n Node.
+ * @param spy Spy.
+ */
+ private void segmentAware(IgniteEx n, SegmentAware spy) {
+ setFieldValue(walMgr(n), "segmentAware", spy);
+ }
+
+ /**
+ * Releases WAL segment.
+ *
+ * @param n Node.
+ * @param reserved Reserved segment.
+ */
+ private void release(IgniteEx n, @Nullable WALPointer reserved) {
+ while (reserved != null && walMgr(n).reserved(reserved))
+ walMgr(n).release(reserved);
+ }
+
+ /**
+ * Returns an instance of {@link SegmentAware} for the given ignite node.
+ *
+ * @return Segment aware.
+ */
+ private SegmentAware segmentAware(IgniteEx n) {
+ return getFieldValue(walMgr(n), "segmentAware");
+ }
+
+ /**
+ * Returns an instance of {@link GridCacheOffheapManager} for the given ignite node.
+ *
+ * @param cache Cache.
+ * @return Offheap manager.
+ */
+ private GridCacheOffheapManager offheapManager(IgniteInternalCache<?, ?> cache) {
+ return (GridCacheOffheapManager)cache.context().group().offheap();
+ }
+
+ /**
+ * Invokes the {@code CheckpointHistory#entry}.
+ *
+ * @param cpHist Checkpoint history.
+ * @param cpTs Checkpoint timestamp.
+ * @return Checkpoint entry.
+ */
+ private CheckpointEntry entry(CheckpointHistory cpHist, Long cpTs) throws Exception {
+ Method entry = U.getNonPublicMethod(cpHist.getClass(), "entry", cpTs.getClass());
+
+ return (CheckpointEntry)entry.invoke(cpHist, cpTs);
+ }
+
+ /**
+ * Checks that the historical rebalance will not fail the nodes.
+ *
+ * @param c Closure to be performed before the node returns to the topology, argument is a running node.
+ * @throws Exception If failed.
+ */
+ private void checkHistoricalRebalance(IgniteThrowableConsumer<IgniteEx> c) throws Exception {
+ IgniteEx n0 = startGrids(2);
+
+ n0.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ populate(n0.cache(DEFAULT_CACHE_NAME), 1_000, 0);
+
+ stopGrid(1);
+
+ populate(n0.cache(DEFAULT_CACHE_NAME), 1_000, 1_000);
+
+ c.accept(n0);
+
+ IgniteEx n1 = startGrid(1);
+ awaitPartitionMapExchange();
+
+ assertEquals(2, G.allGrids().size());
+
+ stopGrid(0);
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 2_000; i++)
+ assertNotNull(String.valueOf(i), n1.cache(DEFAULT_CACHE_NAME).get(i));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java
index ea27743..c97d479 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -75,53 +76,53 @@ public class IgniteWalIteratorExceptionDuringReadTest extends GridCommonAbstract
*/
@Test
public void test() throws Exception {
- IgniteEx ig = (IgniteEx)startGrid();
+ IgniteEx ig = startGrid();
- ig.cluster().active(true);
+ ig.cluster().state(ClusterState.ACTIVE);
IgniteCache<Integer, byte[]> cache = ig.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < 20 * 4; i++)
cache.put(i, new byte[1024 * 1024]);
- ig.cluster().active(false);
+ ig.cluster().state(ClusterState.INACTIVE);
- IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory(log);
+ IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(log);
WALPointer failOnPtr = new WALPointer(3, 1024 * 1024 * 5, 0);
- String failMessage = "test fail message";
+ String failMsg = "test fail message";
IteratorParametersBuilder builder = new IteratorParametersBuilder()
.filesOrDirs(U.defaultWorkDirectory())
.filter((r, ptr) -> {
if (ptr.compareTo(failOnPtr) >= 0)
- throw new TestRuntimeException(failMessage);
+ throw new TestRuntimeException(failMsg);
return true;
});
- try (WALIterator it = iteratorFactory.iterator(builder)) {
+ try (WALIterator it = iterFactory.iterator(builder)) {
WALPointer ptr = null;
boolean failed = false;
- while (it.hasNext()) {
- try {
+ try {
+ while (it.hasNext()) {
+
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
ptr = tup.get1();
- }
- catch (IgniteException e) {
- Assert.assertNotNull(ptr);
- Assert.assertEquals(failOnPtr.index(), ptr.index());
- Assert.assertTrue(ptr.compareTo(failOnPtr) < 0);
-
- failed = X.hasCause(e, TestRuntimeException.class);
- break;
}
}
+ catch (IgniteException e) {
+ Assert.assertNotNull(ptr);
+ Assert.assertEquals(failOnPtr.index(), ptr.index());
+ Assert.assertTrue(ptr.compareTo(failOnPtr) < 0);
+
+ failed = X.hasCause(e, TestRuntimeException.class);
+ }
assertTrue(failed);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 50e4dae..6b3cf37 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -76,6 +77,8 @@ import org.apache.ignite.transactions.Transaction;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
/**
*
*/
@@ -557,7 +560,9 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
WALPointer oldestPtr = cpHist.firstCheckpointPointer();
- GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", oldestPtr);
+ AtomicReference<WALPointer> preloading = getFieldValue(cctx.database(), "reservedForPreloading");
+
+ preloading.set(oldestPtr);
cctx.wal().reserve(oldestPtr);
@@ -569,10 +574,11 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
*
* @param cctx Cache shared context.
* @param ptr WAL pointer to release.
- * @throws IgniteCheckedException If the release failed.
*/
- private void releaseWalPointerForIterator(GridCacheSharedContext cctx, WALPointer ptr) throws IgniteCheckedException {
- GridTestUtils.setFieldValue(cctx.database(), "reservedForPreloading", null);
+ private void releaseWalPointerForIterator(GridCacheSharedContext cctx, WALPointer ptr) {
+ AtomicReference<WALPointer> preloading = getFieldValue(cctx.database(), "reservedForPreloading");
+
+ preloading.set(null);
cctx.wal().release(ptr);
}
@@ -1038,8 +1044,8 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
private T2<long[], Integer> getReuseListData(Ignite ignite, String cacheName) {
GridCacheContext ctx = ((IgniteEx)ignite).context().cache().cache(cacheName).context();
- ReuseListImpl reuseList = GridTestUtils.getFieldValue(ctx.offheap(), "reuseList");
- PagesList.Stripe[] bucket = GridTestUtils.getFieldValue(reuseList, "bucket");
+ ReuseListImpl reuseList = getFieldValue(ctx.offheap(), "reuseList");
+ PagesList.Stripe[] bucket = getFieldValue(reuseList, "bucket");
long[] ids = null;
@@ -1050,7 +1056,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
ids[i] = bucket[i].tailId;
}
- AtomicLongArray bucketsSize = GridTestUtils.getFieldValue(reuseList, PagesList.class, "bucketsSize");
+ AtomicLongArray bucketsSize = getFieldValue(reuseList, PagesList.class, "bucketsSize");
assertEquals(1, bucketsSize.length());
return new T2<>(ids, (int)bucketsSize.get(0));
@@ -1126,10 +1132,10 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
// Flush free-list onheap cache to page memory.
freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
- AtomicReferenceArray<PagesList.Stripe[]> buckets = GridTestUtils.getFieldValue(freeList,
+ AtomicReferenceArray<PagesList.Stripe[]> buckets = getFieldValue(freeList,
AbstractFreeList.class, "buckets");
- AtomicLongArray bucketsSize = GridTestUtils.getFieldValue(freeList, PagesList.class, "bucketsSize");
+ AtomicLongArray bucketsSize = getFieldValue(freeList, PagesList.class, "bucketsSize");
assertNotNull(buckets);
assertNotNull(bucketsSize);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index f571d6b..29a9fc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -93,7 +93,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
- @Override public void release(WALPointer start) throws IgniteCheckedException {
+ @Override public void release(WALPointer start) {
// No-op.
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 18291c4..08a3741 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -563,23 +563,16 @@ public class SegmentAwareTest {
}
/**
- * Should fail when release unreserved segment.
+ * Shouldn't fail when release unreserved segment.
*/
@Test
- public void testAssertFail_WhenReleaseUnreservedSegment() {
+ public void testReleaseUnreservedSegment() {
//given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, false, new NullLogger());
aware.reserve(5);
- try {
-
- aware.release(7);
- }
- catch (AssertionError e) {
- return;
- }
- fail("Should fail with AssertError because this segment have not reserved");
+ aware.release(7);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 6c417d9..37983c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.NotOptimizedRebalanceTest;
import org.apache.ignite.cache.RebalanceAfterResettingLostPartitionTest;
import org.apache.ignite.cache.RebalanceCancellationTest;
import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
+import org.apache.ignite.cache.ReleaseSegmentOnHistoricalRebalanceTest;
import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -107,6 +108,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsSpuriousRebalancingOnNodeJoinTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, RebalanceCompleteDuringExchangeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ReleaseSegmentOnHistoricalRebalanceTest.class, ignoredTests);
// Page lock tracker tests.
GridTestUtils.addTestIfNeeded(suite, PageLockTrackerManagerTest.class, ignoredTests);