You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/06/21 18:56:54 UTC

[ignite] branch master updated: IGNITE-11867 Fix flaky test GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ecec080  IGNITE-11867 Fix flaky test GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions
ecec080 is described below

commit ecec0801c67c9f8bd3ce234adbca665f876e03ad
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Fri Jun 21 21:56:30 2019 +0300

    IGNITE-11867 Fix flaky test GridCacheRebalancingWithAsyncClearingTest#testCorrectRebalancingCurrentlyRentingPartitions
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../dht/preloader/GridDhtPartitionDemander.java    | 320 +++++++++++----------
 .../preloader/GridDhtPartitionsExchangeFuture.java |  19 +-
 .../dht/preloader/GridDhtPreloader.java            |   6 +-
 .../dht/topology/GridDhtLocalPartition.java        |  45 ++-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  91 ++++--
 .../dht/topology/PartitionsEvictManager.java       |   4 +
 .../GridCacheDatabaseSharedManager.java            |   8 +-
 .../cache/persistence/GridCacheOffheapManager.java |   2 +-
 .../cache/persistence/file/FilePageStore.java      |   9 +-
 .../ignite/failure/IoomFailureHandlerTest.java     |   7 +-
 .../distributed/CacheRentingStateRepairTest.java   | 138 ++++++++-
 ...sticOriginatingNodeFailureAbstractSelfTest.java |   4 +
 ...dCacheRebalancingWithAsyncClearingMvccTest.java |   5 +
 .../GridCacheRebalancingWithAsyncClearingTest.java |  13 +-
 ...eRebalanceOnCachesStoppingOrDestroyingTest.java |   4 +-
 .../junits/common/GridCommonAbstractTest.java      |  12 +-
 .../ignite/testsuites/IgnitePdsMvccTestSuite3.java |   6 +-
 .../ignite/testsuites/IgnitePdsMvccTestSuite4.java |   4 +
 18 files changed, 467 insertions(+), 230 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 1c17a6f..0cf5aba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -319,9 +320,8 @@ public class GridDhtPartitionDemander {
                     metrics.clearRebalanceCounters();
 
                     for (GridDhtPartitionDemandMessage msg : assignments.values()) {
-                        for (Integer partId : msg.partitions().fullSet()) {
+                        for (Integer partId : msg.partitions().fullSet())
                             metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
-                        }
 
                         CachePartitionPartialCountersMap histMap = msg.partitions().historicalMap();
 
@@ -523,6 +523,7 @@ public class GridDhtPartitionDemander {
 
                             if (log.isInfoEnabled())
                                 log.info("Started rebalance routine [" + grp.cacheOrGroupName() +
+                                    ", topVer=" + fut.topologyVersion() +
                                     ", supplier=" + node.id() + ", topic=" + topicId +
                                     ", fullPartitions=" + S.compact(stripePartitions.get(topicId).fullSet()) +
                                     ", histPartitions=" + S.compact(stripePartitions.get(topicId).historicalSet()) + "]");
@@ -667,195 +668,204 @@ public class GridDhtPartitionDemander {
 
         final RebalanceFuture fut = rebalanceFut;
 
-        ClusterNode node = ctx.node(nodeId);
+        try {
+            fut.cancelLock.readLock().lock();
 
-        if (node == null) {
-            if (log.isDebugEnabled())
-                log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+            ClusterNode node = ctx.node(nodeId);
 
-            return;
-        }
+            if (node == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
 
-        // Topology already changed (for the future that supply message based on).
-        if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
-            if (log.isDebugEnabled())
-                log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+                return;
+            }
 
-            return;
-        }
+            // Topology already changed (for the future that supply message based on).
+            if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
+                if (log.isDebugEnabled())
+                    log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
 
-        if (log.isDebugEnabled())
-            log.debug("Received supply message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+                return;
+            }
 
-        // Check whether there were error during supply message unmarshalling process.
-        if (supplyMsg.classError() != null) {
-            U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
-                ". Supply message couldn't be unmarshalled: " + supplyMsg.classError());
+            if (log.isDebugEnabled())
+                log.debug("Received supply message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
 
-            fut.cancel(nodeId);
+            // Check whether there were error during supply message unmarshalling process.
+            if (supplyMsg.classError() != null) {
+                U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+                    ". Supply message couldn't be unmarshalled: " + supplyMsg.classError());
 
-            return;
-        }
+                fut.cancel(nodeId);
 
-        // Check whether there were error during supplying process.
-        if (supplyMsg.error() != null) {
-            U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
-                "]. Supplier has failed with error: " + supplyMsg.error());
+                return;
+            }
 
-            fut.cancel(nodeId);
+            // Check whether there were error during supplying process.
+            if (supplyMsg.error() != null) {
+                U.warn(log, "Rebalancing from node cancelled [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]" +
+                    "]. Supplier has failed with error: " + supplyMsg.error());
 
-            return;
-        }
+                fut.cancel(nodeId);
 
-        final GridDhtPartitionTopology top = grp.topology();
+                return;
+            }
 
-        if (grp.sharedGroup()) {
-            for (GridCacheContext cctx : grp.caches()) {
-                if (cctx.statisticsEnabled()) {
-                    long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
+            final GridDhtPartitionTopology top = grp.topology();
 
-                    if (keysCnt != -1)
-                        cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+            if (grp.sharedGroup()) {
+                for (GridCacheContext cctx : grp.caches()) {
+                    if (cctx.statisticsEnabled()) {
+                        long keysCnt = supplyMsg.keysForCache(cctx.cacheId());
 
-                    // Can not be calculated per cache.
-                    cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+                        if (keysCnt != -1)
+                            cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+
+                        // Can not be calculated per cache.
+                        cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+                    }
                 }
             }
-        }
-        else {
-            GridCacheContext cctx = grp.singleCacheContext();
+            else {
+                GridCacheContext cctx = grp.singleCacheContext();
 
-            if (cctx.statisticsEnabled()) {
-                if (supplyMsg.estimatedKeysCount() != -1)
-                    cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
+                if (cctx.statisticsEnabled()) {
+                    if (supplyMsg.estimatedKeysCount() != -1)
+                        cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supplyMsg.estimatedKeysCount());
 
-                cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+                    cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
+                }
             }
-        }
 
-        try {
-            AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+            try {
+                AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
-            // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supplyMsg.infos().entrySet()) {
-                int p = e.getKey();
+                // Preload.
+                for (Map.Entry<Integer, CacheEntryInfoCollection> e : supplyMsg.infos().entrySet()) {
+                    int p = e.getKey();
 
-                if (aff.get(p).contains(ctx.localNode())) {
-                    GridDhtLocalPartition part;
+                    if (aff.get(p).contains(ctx.localNode())) {
+                        GridDhtLocalPartition part;
 
-                    try {
-                        part = top.localPartition(p, topVer, true);
-                    }
-                    catch (GridDhtInvalidPartitionException err) {
-                        assert !topVer.equals(top.lastTopologyChangeVersion());
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("Failed to get partition for rebalancing [" +
-                                "grp=" + grp.cacheOrGroupName() +
-                                ", err=" + err +
-                                ", p=" + p +
-                                ", topVer=" + topVer +
-                                ", lastTopVer=" + top.lastTopologyChangeVersion() + ']');
+                        try {
+                            part = top.localPartition(p, topVer, true);
                         }
+                        catch (GridDhtInvalidPartitionException err) {
+                            assert !topVer.equals(top.lastTopologyChangeVersion());
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to get partition for rebalancing [" +
+                                    "grp=" + grp.cacheOrGroupName() +
+                                    ", err=" + err +
+                                    ", p=" + p +
+                                    ", topVer=" + topVer +
+                                    ", lastTopVer=" + top.lastTopologyChangeVersion() + ']');
+                            }
 
-                        continue;
-                    }
+                            continue;
+                        }
 
-                    assert part != null;
+                        assert part != null;
 
-                    boolean last = supplyMsg.last().containsKey(p);
+                        boolean last = supplyMsg.last().containsKey(p);
 
-                    if (part.state() == MOVING) {
-                        boolean reserved = part.reserve();
+                        if (part.state() == MOVING) {
+                            boolean reserved = part.reserve();
 
-                        assert reserved : "Failed to reserve partition [igniteInstanceName=" +
-                            ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
+                            assert reserved : "Failed to reserve partition [igniteInstanceName=" +
+                                ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
 
-                        part.lock();
+                            part.lock();
 
-                        try {
-                            Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
+                            part.beforeApplyBatch(last);
 
-                            if (grp.mvccEnabled())
-                                mvccPreloadEntries(topVer, node, p, infos);
-                            else
-                                preloadEntries(topVer, node, p, infos);
+                            try {
+                                Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator();
+
+                                if (grp.mvccEnabled())
+                                    mvccPreloadEntries(topVer, node, p, infos);
+                                else
+                                    preloadEntries(topVer, node, p, infos);
 
-                            // If message was last for this partition,
-                            // then we take ownership.
-                            if (last) {
-                                fut.partitionDone(nodeId, p, true);
+                                // If message was last for this partition,
+                                // then we take ownership.
+                                if (last) {
+                                    fut.partitionDone(nodeId, p, true);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Finished rebalancing partition: " +
-                                        "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+                                    if (log.isDebugEnabled())
+                                        log.debug("Finished rebalancing partition: " +
+                                            "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+                                }
+                            }
+                            finally {
+                                part.unlock();
+                                part.release();
                             }
                         }
-                        finally {
-                            part.unlock();
-                            part.release();
+                        else {
+                            if (last)
+                                fut.partitionDone(nodeId, p, false);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Skipping rebalancing partition (state is not MOVING): " +
+                                    "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
                         }
                     }
                     else {
-                        if (last)
-                            fut.partitionDone(nodeId, p, false);
+                        fut.partitionDone(nodeId, p, false);
 
                         if (log.isDebugEnabled())
-                            log.debug("Skipping rebalancing partition (state is not MOVING): " +
+                            log.debug("Skipping rebalancing partition (affinity changed): " +
                                 "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
                     }
                 }
-                else {
-                    fut.partitionDone(nodeId, p, false);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping rebalancing partition (affinity changed): " +
-                            "[" + demandRoutineInfo(topicId, nodeId, supplyMsg) + ", p=" + p + "]");
+                // Only request partitions based on latest topology version.
+                for (Integer miss : supplyMsg.missed()) {
+                    if (aff.get(miss).contains(ctx.localNode()))
+                        fut.partitionMissed(nodeId, miss);
                 }
-            }
 
-            // Only request partitions based on latest topology version.
-            for (Integer miss : supplyMsg.missed()) {
-                if (aff.get(miss).contains(ctx.localNode()))
-                    fut.partitionMissed(nodeId, miss);
-            }
+                for (Integer miss : supplyMsg.missed())
+                    fut.partitionDone(nodeId, miss, false);
 
-            for (Integer miss : supplyMsg.missed())
-                fut.partitionDone(nodeId, miss, false);
+                GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                    supplyMsg.rebalanceId(),
+                    supplyMsg.topologyVersion(),
+                    grp.groupId());
 
-            GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                supplyMsg.rebalanceId(),
-                supplyMsg.topologyVersion(),
-                grp.groupId());
+                d.timeout(grp.preloader().timeout());
 
-            d.timeout(grp.preloader().timeout());
-
-            d.topic(rebalanceTopics.get(topicId));
+                d.topic(rebalanceTopics.get(topicId));
 
-            if (!topologyChanged(fut) && !fut.isDone()) {
-                // Send demand message.
-                try {
-                    ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
-                        d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
+                if (!topologyChanged(fut) && !fut.isDone()) {
+                    // Send demand message.
+                    try {
+                        ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId),
+                            d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
 
-                    if (log.isDebugEnabled())
-                        log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+                        if (log.isDebugEnabled())
+                            log.debug("Send next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) + "]");
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+                                ", errMsg=" + e.getMessage() + ']');
+                    }
                 }
-                catch (ClusterTopologyCheckedException e) {
+                else {
                     if (log.isDebugEnabled())
-                        log.debug("Supplier has left [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
-                            ", errMsg=" + e.getMessage() + ']');
+                        log.debug("Will not request next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+                            ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
                 }
             }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Will not request next demand message [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
-                        ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
+            catch (IgniteSpiException | IgniteCheckedException e) {
+                LT.error(log, e, "Error during rebalancing [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
+                    ", err=" + e + ']');
             }
         }
-        catch (IgniteSpiException | IgniteCheckedException e) {
-            LT.error(log, e, "Error during rebalancing [" + demandRoutineInfo(topicId, nodeId, supplyMsg) +
-                ", err=" + e + ']');
+        finally {
+            fut.cancelLock.readLock().unlock();
         }
     }
 
@@ -1207,11 +1217,17 @@ public class GridDhtPartitionDemander {
         /** The number of rebalance routines. */
         private final long routines;
 
+        /** Used to order rebalance cancellation and supply message processing, they should not overlap.
+         * Otherwise partition clearing could start on still rebalancing partition resulting in eviction of
+         * partition in OWNING state. */
+        private final ReentrantReadWriteLock cancelLock;
+
         /**
          * @param grp Cache group.
          * @param assignments Assignments.
          * @param log Logger.
-         * @param rebalanceId Rebalance id.
+         *
+            @param rebalanceId Rebalance id.
          */
         RebalanceFuture(
             CacheGroupContext grp,
@@ -1237,6 +1253,8 @@ public class GridDhtPartitionDemander {
             this.rebalanceId = rebalanceId;
 
             ctx = grp.shared();
+
+            cancelLock = new ReentrantReadWriteLock();
         }
 
         /**
@@ -1250,6 +1268,7 @@ public class GridDhtPartitionDemander {
             this.log = null;
             this.rebalanceId = -1;
             this.routines = 0;
+            this.cancelLock = new ReentrantReadWriteLock();
         }
 
         /**
@@ -1280,24 +1299,33 @@ public class GridDhtPartitionDemander {
          * @return {@code True}.
          */
         @Override public boolean cancel() {
-            synchronized (this) {
-                if (isDone())
-                    return true;
+            try {
+                // Cancel lock is needed only for case when some message might be on the fly while rebalancing is
+                // cancelled.
+                cancelLock.writeLock().lock();
 
-                U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
-                    ", topVer=" + topologyVersion() + "]");
+                synchronized (this) {
+                    if (isDone())
+                        return true;
 
-                if (!ctx.kernalContext().isStopping()) {
-                    for (UUID nodeId : remaining.keySet())
-                        cleanupRemoteContexts(nodeId);
-                }
+                    U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
+                        ", topVer=" + topologyVersion() + "]");
 
-                remaining.clear();
+                    if (!ctx.kernalContext().isStopping()) {
+                        for (UUID nodeId : remaining.keySet())
+                            cleanupRemoteContexts(nodeId);
+                    }
 
-                checkIsDone(true /* cancelled */);
-            }
+                    remaining.clear();
 
-            return true;
+                    checkIsDone(true /* cancelled */);
+                }
+
+                return true;
+            }
+            finally {
+                cancelLock.writeLock().unlock();
+            }
         }
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 319e6b8..103cf47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -352,8 +352,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Discovery lag / Clocks discrepancy, calculated on coordinator when all single messages are received. */
     private T2<Long, UUID> discoveryLag;
 
-    /** TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11799 */
-    private Map<Integer, Set<Integer>> clearingPartitions;
+    /** Partitions scheduled for historical reblanace for this topology version. */
+    private Map<Integer, Set<Integer>> histPartitions;
 
     /**
      * @param cctx Cache context.
@@ -1437,7 +1437,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             cctx.exchange().exchangerBlockingSectionEnd();
         }
 
-        clearingPartitions = new HashMap();
+        histPartitions = new HashMap();
 
         timeBag.finishGlobalStage("WAL history reservation");
 
@@ -5046,21 +5046,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * If partition is clearing or already cleared we need full rebalance even if supplier is exists.
-     * (it still could be used by other demanders)
-     *
      * @param grp Group.
      * @param part Partition.
      */
-    public boolean isClearingPartition(CacheGroupContext grp, int part) {
+    public boolean isHistoryPartition(CacheGroupContext grp, int part) {
         if (!grp.persistenceEnabled())
             return false;
 
         synchronized (mux) {
-            if (clearingPartitions == null)
+            if (histPartitions == null)
                 return false;
 
-            Set<Integer> parts = clearingPartitions.get(grp.groupId());
+            Set<Integer> parts = histPartitions.get(grp.groupId());
 
             return parts != null && parts.contains(part);
         }
@@ -5070,12 +5067,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param grp Group.
      * @param part Partition.
      */
-    public void addClearingPartition(CacheGroupContext grp, int part) {
+    public void addHistoryPartition(CacheGroupContext grp, int part) {
         if (!grp.persistenceEnabled())
             return;
 
         synchronized (mux) {
-            clearingPartitions.computeIfAbsent(grp.groupId(), k -> new HashSet()).add(part);
+            histPartitions.computeIfAbsent(grp.groupId(), k -> new HashSet()).add(part);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a40d08d..b57e062 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -255,9 +255,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     if (part.reserve()) {
                         part.moving();
 
-                        if (exchFut != null)
-                            exchFut.addClearingPartition(grp, part.id());
-
                         part.clearAsync();
 
                         part.release();
@@ -282,8 +279,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                         histSupplier = ctx.discovery().node(nodeId);
                 }
 
-                // Clearing partition should always be fully reloaded.
-                if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
+                if (histSupplier != null && exchFut.isHistoryPartition(grp, p)) {
                     assert grp.persistenceEnabled();
                     assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a131a21..5e637e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -379,9 +380,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
         // Make sure to remove exactly this entry.
         removeEntry(entry);
-
-        // Attempt to evict.
-        tryContinueClearing();
     }
 
     /**
@@ -518,14 +516,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
             // Decrement reservations.
             if (this.state.compareAndSet(state, newState)) {
-                // If no more reservations try to continue delayed renting or clearing process.
-                if (reservations == 0) {
-                    if (delayedRenting)
-                        rent(true);
-                    else
-                        tryContinueClearing();
-                }
+                // If no more reservations try to continue delayed renting.
+                if (reservations == 0 && delayedRenting)
+                    rent(true);
 
+                // Partition could be only reserved in OWNING state so no further actions
+                // are required.
                 break;
             }
         }
@@ -697,6 +693,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @param updateSeq Update sequence.
      */
     private void clearAsync0(boolean updateSeq) {
+        // Method expected to be called  from exchange worker or rebalancing thread when rebalancing is done.
         long state = this.state.get();
 
         GridDhtPartitionState partState = getPartState(state);
@@ -747,7 +744,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
         clear = true;
 
-        clearAsync0(false);
+        GridDhtPartitionDemander.RebalanceFuture rebFut =
+            (GridDhtPartitionDemander.RebalanceFuture)grp.preloader().rebalanceFuture();
+
+        // Make sure current rebalance future finishes before clearing
+        // to avoid clearing currently rebalancing partition.
+        // NOTE: this invariant is not true for initial rebalance future.
+        if (rebFut.topologyVersion() != null && state0 == MOVING && !rebFut.isDone())
+            rebFut.listen(fut -> clearAsync0(false));
+        else
+            clearAsync0(false);
     }
 
     /**
@@ -907,6 +913,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * Only one thread is allowed to do such process concurrently.
      * At the end of clearing method completes {@code clearFuture}.
      *
+     * @param evictionCtx Eviction context.
+     *
      * @return {@code false} if clearing is not started due to existing reservations.
      * @throws NodeStoppingException If node is stopping.
      */
@@ -961,7 +969,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * Tries to continue delayed partition clearing.
      */
     public void onUnlock() {
-        tryContinueClearing();
+        // No-op.
     }
 
     /**
@@ -1144,7 +1152,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                     CacheDataRow row = it0.next();
 
                     // Do not clear fresh rows in case of partition reloading.
-                    // This is required because updates are possible to moving partition which is currently cleared.
+                    // This is required because normal updates are possible to moving partition which is currently cleared.
                     if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
                         continue;
 
@@ -1451,6 +1459,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * Called before next batch is about to be applied during rebalance. Currently used for tests.
+     *
+     * @param last {@code True} if last batch for partition.
+     */
+    public void beforeApplyBatch(boolean last) {
+        // No-op.
+    }
+
+    /**
      * Removed entry holder.
      */
     private static class RemovedEntryHolder {
@@ -1598,7 +1615,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         public void finish() {
             synchronized (this) {
                 onDone();
-                finished = true;
+                finished = true; // Marks state when all future listeners are finished.
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 012c30b..a831e78 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -152,6 +152,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
 
+    /** Factory used for re-creating partition during it's lifecycle. */
+    private PartitionFactory partFactory;
+
     /**
      * @param ctx Cache shared context.
      * @param grp Cache group.
@@ -173,6 +176,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
 
         cntrMap = new CachePartitionFullCountersMap(locParts.length());
+
+        partFactory = (ctx1, grp1, id) -> new GridDhtLocalPartition(ctx1, grp1, id, false);
+    }
+
+    /**
+     * Set partition factory to use. Currently is used for tests.
+     *
+     * @param factory Factory.
+     */
+    public void partitionFactory(PartitionFactory factory) {
+        this.partFactory = factory;
     }
 
     /** {@inheritDoc} */
@@ -494,15 +508,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         for (int p = 0; p < partitions; p++) {
             if (node2part != null && node2part.valid()) {
                 if (localNode(p, aff)) {
-                    // This will make sure that all non-existing partitions
-                    // will be created in MOVING state.
-                    boolean existing = locParts.get(p) != null;
-
                     GridDhtLocalPartition locPart = getOrCreatePartition(p);
 
-                    if (existing && locPart.state() == MOVING && !locPart.isEmpty())
-                        exchFut.addClearingPartition(grp, p);
-
                     updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
                 }
             }
@@ -781,19 +788,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                                 "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']');
                                     }
 
-                                // If partition was not still cleared yet start clearing if needed.
-                                // Important: avoid calling clearAsync multiple times in the same rebalance session
-                                // or bad things may happen depending on timing.
-                                if (exchFut.isClearingPartition(grp, p) && !locPart.isClearing() && !locPart.isEmpty())
-                                    locPart.clearAsync();
+                                    // It's important to clear non empty moving partitions before full rebalancing.
+                                    // Consider the scenario:
+                                    // Node1 has keys k1 and k2 in the same partition.
+                                    // Node2 started rebalancing from Node1.
+                                    // Node2 received k1, k2 and failed before moving partition to OWNING state.
+                                    // Node1 removes k2 but update has not been delivered to Node1 because of failure.
+                                    // After new full rebalance Node1 will only send k1 to Node2 causing lost removal.
+                                    // NOTE: avoid calling clearAsync for partition twice per topology version.
+                                    // TODO FIXME clearing is not always needed see IGNITE-11799
+                                    if (grp.persistenceEnabled() && !exchFut.isHistoryPartition(grp, locPart.id()) &&
+                                        !locPart.isClearing() && !locPart.isEmpty() && !grp.mvccEnabled())
+                                        locPart.clearAsync();
+                                }
+                                else
+                                    updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
                             }
-                            else
-                                updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
                         }
-                    }
-                    else {
-                        if (locPart != null) {
-                            GridDhtPartitionState state = locPart.state();
+                        else {
+                            if (locPart != null) {
+                                GridDhtPartitionState state = locPart.state();
 
                                 if (state == MOVING) {
                                     locPart.rent(false);
@@ -869,7 +883,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (loc != null)
                 loc.awaitDestroy();
 
-            locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
+            locParts.set(p, loc = partFactory.create(ctx, grp, p));
 
             long updCntr = cntrMap.updateCounter(p);
 
@@ -897,8 +911,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             GridDhtLocalPartition part = locParts.get(p);
 
-            if (part != null && part.state() != EVICTED)
-                return part;
+            if (part != null) {
+                if (part.state() != EVICTED)
+                    return part;
+                else
+                    part.awaitDestroy();
+            }
 
             part = new GridDhtLocalPartition(ctx, grp, p, true);
 
@@ -977,7 +995,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
                             ", this.topVer=" + this.readyTopVer + ']');
 
-                    locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, false));
+                    locParts.set(p, loc = partFactory.create(ctx, grp, p));
 
                     this.updateSeq.incrementAndGet();
 
@@ -2312,8 +2330,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * Prevents ongoing renting if required.
      *
      * @param p Partition id.
-     * @param clear If {@code true} partition have to be cleared before rebalance.
-     *              Required in case of full state transfer to handle removals on supplier.
+     * @param clear If {@code true} partition have to be cleared before rebalance (full rebalance or rebalance restart
+     * after cancellation).
      * @param exchFut Future related to partition state change.
      */
     private GridDhtLocalPartition rebalancePartition(int p, boolean clear, GridDhtPartitionsExchangeFuture exchFut) {
@@ -2335,11 +2353,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (part.state() != MOVING)
             part.moving();
 
-        if (clear) {
-            exchFut.addClearingPartition(grp, part.id());
-
-            part.clearAsync();
-        }
+        if (!clear)
+            exchFut.addHistoryPartition(grp, part.id());
 
         assert part.state() == MOVING : part;
 
@@ -2451,7 +2466,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             if (log.isDebugEnabled())
                                 log.debug("Partitions have been scheduled to resend [reason=" +
-                                    "Evictions are done [grp" + grp.cacheOrGroupName() + "]");
+                                    "Evictions are done [grp=" + grp.cacheOrGroupName() + "]");
 
                             ctx.exchange().scheduleResendPartitions();
                         }
@@ -3103,4 +3118,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             throw new UnsupportedOperationException("remove");
         }
     }
+
+    /**
+     * Partition factory used for (re-)creating partitions during their lifecycle.
+     * Currently used in tests for overriding default partition behavior.
+     */
+    public interface PartitionFactory {
+        /**
+         * @param ctx Context.
+         * @param grp Group.
+         * @param id Partition id.
+         * @return New partition instance.
+         */
+        public GridDhtLocalPartition create(GridCacheSharedContext ctx,
+            CacheGroupContext grp,
+            int id);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index c458d3e..b294843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -412,8 +412,12 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
             }
 
             try {
+                assert part.state() != GridDhtPartitionState.OWNING : part;
+
                 boolean success = part.tryClear(grpEvictionCtx);
 
+                assert part.state() != GridDhtPartitionState.OWNING : part;
+
                 if (success) {
                     if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
                         part.destroy();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 935f403..1975c0f 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2401,8 +2401,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             log.info("Finished applying memory changes [changesApplied=" + applied +
                 ", time=" + (U.currentTimeMillis() - start) + " ms]");
 
-            assert applied.get() > 0;
-
             finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
         }
 
@@ -2767,7 +2765,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                                 int partId = cacheState.partitionByIndex(i);
                                 byte state = cacheState.stateByIndex(i);
 
-                                partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId), (int)state);
+                                // Ignore undefined state.
+                                if (state != -1) {
+                                    partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId),
+                                        (int)state);
+                                }
                             }
                         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c5045ba..8207b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -559,7 +559,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                     ctx.database().checkpointReadUnlock();
                 }
             }
-            else if (recoverState != null && recoverState >= 0) { // Pre-create partition if having valid state.
+            else if (recoverState != null) { // Pre-create partition if having valid state.
                 GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
                 updateState(part, recoverState);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index adaf118..8e8494b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -302,8 +302,15 @@ public class FilePageStore implements PageStore {
         lock.writeLock().lock();
 
         try {
-            if (!inited)
+            if (!inited) {
+                if (fileIO != null) // Ensure the file is closed even if not initialized yet.
+                    fileIO.close();
+
+                if (delete && exists())
+                    Files.delete(pathProvider.apply().toAbsolutePath());
+
                 return;
+            }
 
             fileIO.force();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
index 9f95f41..402c1a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/IoomFailureHandlerTest.java
@@ -61,8 +61,13 @@ public class IoomFailureHandlerTest extends AbstractFailureHandlerTest {
         dfltPlcCfg.setInitialSize(SIZE);
         dfltPlcCfg.setMaxSize(SIZE);
 
-        if (pds)
+        if (pds) {
+            // We need longer failure detection timeout for PDS enabled mode or checkpoint write lock can block tx
+            // checkpoint read lock for too long causing FH triggering on slow hardware.
+            cfg.setFailureDetectionTimeout(30_000);
+
             dfltPlcCfg.setPersistenceEnabled(true);
+        }
 
         dsCfg.setDefaultDataRegionConfiguration(dfltPlcCfg);
         dsCfg.setPageSize(PAGE_SIZE);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
index 87402b6..77e72bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRentingStateRepairTest.java
@@ -28,10 +28,14 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
@@ -40,16 +44,21 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJU
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
- *
+ * Contains several test scenarios related to partition state transitions during it's lifecycle.
  */
 public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
     /** */
     public static final int PARTS = 1024;
 
+    /** */
+    private static final String CLIENT = "client";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setClientMode(CLIENT.equals(igniteInstanceName));
+
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS).setPartitions(64));
@@ -73,6 +82,7 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
         DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(sz).setMaxSize(sz))
+            .setWalSegmentSize(8 * 1024 * 1024)
             .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
 
         cfg.setDataStorageConfiguration(memCfg);
@@ -100,7 +110,7 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Tests partition is properly evicted when node is restarted in the middle of the eviction.
      */
     @Test
     public void testRentingStateRepairAfterRestart() throws Exception {
@@ -194,6 +204,130 @@ public class CacheRentingStateRepairTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * Tests the partition is not cleared when rebalanced.
+     */
+    @Test
+    public void testRebalanceRentingPartitionAndServerNodeJoin() throws Exception {
+        testRebalanceRentingPartitionAndNodeJoin(false, 0);
+    }
+
+    /**
+     * Tests the partition is not cleared when rebalanced.
+     */
+    @Test
+    public void testRebalanceRentingPartitionAndClientNodeJoin() throws Exception {
+        testRebalanceRentingPartitionAndNodeJoin(true, 0);
+    }
+
+    /**
+     * Tests the partition is not cleared when rebalanced.
+     */
+    @Test
+    public void testRebalanceRentingPartitionAndServerNodeJoinWithDelay() throws Exception {
+        testRebalanceRentingPartitionAndNodeJoin(false, 5_000);
+    }
+
+    /**
+     * Tests the partition is not cleared when rebalanced.
+     */
+    @Test
+    public void testRebalanceRentingPartitionAndClientNodeJoinWithDelay() throws Exception {
+        testRebalanceRentingPartitionAndNodeJoin(true, 5_000);
+    }
+
+    /**
+     * @param client {@code True} for client node join.
+     * @param delay Delay.
+     *
+     * @throws Exception if failed.
+     */
+    private void testRebalanceRentingPartitionAndNodeJoin(boolean client, long delay) throws Exception {
+        try {
+            IgniteEx g0 = startGrids(2);
+
+            g0.cluster().active(true);
+
+            awaitPartitionMapExchange();
+
+            List<Integer> parts = evictingPartitionsAfterJoin(g0, g0.cache(DEFAULT_CACHE_NAME), 20);
+
+            int delayEvictPart = parts.get(0);
+
+            List<Integer> keys = partitionKeys(g0.cache(DEFAULT_CACHE_NAME), delayEvictPart, 2_000, 0);
+
+            for (Integer key : keys)
+                g0.cache(DEFAULT_CACHE_NAME).put(key, key);
+
+            GridDhtPartitionTopologyImpl top = (GridDhtPartitionTopologyImpl)dht(g0.cache(DEFAULT_CACHE_NAME)).topology();
+
+            GridDhtLocalPartition part = top.localPartition(delayEvictPart);
+
+            assertNotNull(part);
+
+            // Prevent eviction.
+            part.reserve();
+
+            startGrid(2);
+
+            resetBaselineTopology();
+
+            part.release();
+
+            part.rent(false).get();
+
+            CountDownLatch l1 = new CountDownLatch(1);
+            CountDownLatch l2 = new CountDownLatch(1);
+
+            // Create race between processing of final supply message and partition clearing.
+            top.partitionFactory((ctx, grp, id) -> id != delayEvictPart ? new GridDhtLocalPartition(ctx, grp, id, false) :
+                new GridDhtLocalPartition(ctx, grp, id, false) {
+                    @Override public void beforeApplyBatch(boolean last) {
+                        if (last) {
+                            l1.countDown();
+
+                            U.awaitQuiet(l2);
+
+                            if (delay > 0) // Delay rebalance finish to enforce race with clearing.
+                                doSleep(delay);
+                        }
+                    }
+                });
+
+            stopGrid(2);
+
+            resetBaselineTopology(); // Trigger rebalance for delayEvictPart after eviction.
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        l1.await();
+
+                        // Trigger partition clear on next topology version.
+                        if (client)
+                            startGrid(CLIENT);
+                        else
+                            startGrid(2);
+
+                        l2.countDown(); // Finish partition rebalance after initiating clear.
+                    }
+                    catch (Exception e) {
+                        fail(X.getFullStackTrace(e));
+                    }
+                }
+            }, 1);
+
+            fut.get();
+
+            awaitPartitionMapExchange(true, true, null, true);
+
+            assertPartitionsSame(idleVerify(g0));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         cleanPersistenceDir();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 0399a77..8186385 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -289,6 +289,8 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
             }
         }
 
+        awaitPartitionMapExchange();
+
         for (Map.Entry<Integer, String> e : map.entrySet()) {
             long cntr0 = -1;
 
@@ -453,6 +455,8 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
             }
         }
 
+        awaitPartitionMapExchange();
+
         for (Map.Entry<Integer, String> e : map.entrySet()) {
             long cntr0 = -1;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
index a8a9b4a..cadd25d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingMvccTest.java
@@ -26,4 +26,9 @@ public class GridCacheRebalancingWithAsyncClearingMvccTest extends GridCacheReba
     @Override protected CacheAtomicityMode atomicityMode() {
         return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
     }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return super.getTestTimeout() * 2; // Parent test generates a lot of data and is inherently slow in mvcc mode.
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
index 8a6a962..db01b09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingWithAsyncClearingTest.java
@@ -65,7 +65,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
                 .setDefaultDataRegionConfiguration(
                     new DataRegionConfiguration()
                         .setPersistenceEnabled(true)
-                        .setMaxSize(100L * 1024 * 1024))
+                        .setMaxSize(300L * 1024 * 1024))
         );
 
         cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
@@ -128,7 +128,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
     public void testPartitionClearingNotBlockExchange() throws Exception {
         System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, "1");
 
-        IgniteEx ig = (IgniteEx) startGrids(3);
+        IgniteEx ig = startGrids(3);
         ig.cluster().active(true);
 
         // High number of keys triggers long partition eviction.
@@ -223,7 +223,7 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
      */
     @Test
     public void testCorrectRebalancingCurrentlyRentingPartitions() throws Exception {
-        IgniteEx ignite = (IgniteEx) startGrids(3);
+        IgniteEx ignite = startGrids(3);
         ignite.cluster().active(true);
 
         // High number of keys triggers long partition eviction.
@@ -232,11 +232,10 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
         try (IgniteDataStreamer<Integer, Integer> ds = ignite.dataStreamer(CACHE_NAME)) {
             log.info("Writing initial data...");
 
-            ds.allowOverwrite(true);
             for (int k = 1; k <= keysCnt; k++) {
                 ds.addData(k, k);
 
-                if (k % 10_000 == 0)
+                if (k % 50_000 == 0)
                     log.info("Written " + k + " entities.");
             }
 
@@ -259,14 +258,12 @@ public class GridCacheRebalancingWithAsyncClearingTest extends GridCommonAbstrac
         // Started node should have partition in RENTING or EVICTED state.
         startGrid(1);
 
-        awaitPartitionMapExchange();
+        awaitPartitionMapExchange(true, true, null, true);
 
         // Check no data loss.
         for (int k = 1; k <= keysCnt; k++) {
             Integer val = (Integer) ignite.cache(CACHE_NAME).get(k);
-
             Assert.assertNotNull("Value for " + k + " is null", val);
-
             Assert.assertEquals("Check failed for " + k + " = " + val, k, (int)val);
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
index f3eb906..286de48 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -257,6 +258,7 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
             .setCacheMode(CacheMode.REPLICATED)
             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setAffinity(new RendezvousAffinityFunction(false, 64))
         ).collect(Collectors.toList());
 
         ig.getOrCreateCaches(configs);
@@ -264,7 +266,7 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
         configs.forEach(cfg -> {
             try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
                 for (int i = 0; i < KEYS_SIZE; i++)
-                    streamer.addData(i, new byte[1024]);
+                    streamer.addData(i, i);
 
                 streamer.flush();
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index f2c6239..638e469 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -89,7 +89,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -130,6 +129,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -704,7 +704,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                     !affNodes.get(0).equals(dht.context().affinity().primaryByPartition(p, readyVer));
 
                                 if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) ||
-                                    (waitEvicts && loc != null && loc.state() != GridDhtPartitionState.OWNING) ||
+                                    (waitEvicts && loc != null && loc.state() != OWNING) ||
                                     notPrimary) {
                                     if (i % 50 == 0)
                                         LT.warn(log(), "Waiting for topology map update [" +
@@ -794,7 +794,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                             ", locNode=" + g.cluster().localNode() + ']');
                                     }
 
-                                    if (entry.getValue() != GridDhtPartitionState.OWNING) {
+                                    if (entry.getValue() != OWNING) {
                                         LT.warn(log(),
                                             "Waiting for correct partition state part=" + entry.getKey()
                                                 + ", should be OWNING [state=" + entry.getValue() + "], node=" +
@@ -2263,7 +2263,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
         @Nullable GridDhtLocalPartition locPart =
             internalCache(grid(gridName).cache(DEFAULT_CACHE_NAME)).context().topology().localPartition(partId);
 
-        return locPart == null ? null : locPart.dataStore().partUpdateCounter();
+        return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter();
     }
 
     /**
@@ -2277,7 +2277,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
         @Nullable GridDhtLocalPartition locPart =
             internalCache(grid(gridName).cache(cacheName)).context().topology().localPartition(partId);
 
-        return locPart == null ? null : locPart.dataStore().partUpdateCounter();
+        return locPart == null || locPart.state() != OWNING ? null : locPart.dataStore().partUpdateCounter();
     }
 
     /**
@@ -2308,7 +2308,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
             PartitionUpdateCounter cntr = counter(partId, ignite.name());
 
             if (cntr0 != null) {
-                assertEquals("Expecting same counters", cntr0, cntr);
+                assertEquals("Expecting same counters [partId=" + partId + ']', cntr0, cntr);
 
                 if (withReserveCntr)
                     assertEquals("Expecting same reservation counters", cntr0.reserved(), cntr.reserved());
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
index e9792db..19c2fb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite3.java
@@ -19,6 +19,8 @@ package org.apache.ignite.testsuites;
 import java.util.HashSet;
 import java.util.List;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy;
 import org.apache.ignite.testframework.junits.DynamicSuite;
 import org.junit.runner.RunWith;
 
@@ -35,7 +37,9 @@ public class IgnitePdsMvccTestSuite3 {
 
         HashSet<Class> ignoredTests = new HashSet<>();
 
-        // No ignored tests yet.
+        // TODO https://issues.apache.org/jira/browse/IGNITE-11937
+        ignoredTests.add(IgnitePdsContinuousRestartTest.class);
+        ignoredTests.add(IgnitePdsContinuousRestartTestWithExpiryPolicy.class);
 
         return IgnitePdsTestSuite3.suite(ignoredTests);
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
index 78c9480..b7b36d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
 import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManagerTest;
@@ -53,6 +54,9 @@ public class IgnitePdsMvccTestSuite4 {
         ignoredTests.add(FileDownloaderTest.class);
         ignoredTests.add(IgnitePdsTaskCancelingTest.class);
 
+        // TODO https://issues.apache.org/jira/browse/IGNITE-11937
+        ignoredTests.add(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
+
         // Skip page lock tracker tests for MVCC suite.
         ignoredTests.add(PageLockTrackerManagerTest.class);
         ignoredTests.add(SharedPageLockTrackerTest.class);