You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/17 15:38:46 UTC

[ignite] branch master updated: IGNITE-12551 Partition desync if a partition is evicted then owned again and historically rebalanced. - Fixes #7270.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e95f65  IGNITE-12551 Partition desync if a partition is evicted then owned again and historically rebalanced. - Fixes #7270.
5e95f65 is described below

commit 5e95f65bbc1d3c24a0d094745ab5790be712feac
Author: Aleksei Scherbakov <al...@gmail.com>
AuthorDate: Fri Jan 17 18:38:13 2020 +0300

    IGNITE-12551 Partition desync if a partition is evicted then owned again and historically rebalanced. - Fixes #7270.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../cache/GridCachePartitionExchangeManager.java   | 13 +++
 .../cache/IgniteCacheOffheapManager.java           |  2 +-
 .../PartitionTxUpdateCounterDebugWrapper.java      | 17 ++++
 .../cache/PartitionTxUpdateCounterImpl.java        |  2 +
 .../dht/preloader/GridDhtPreloader.java            |  4 +
 .../dht/topology/GridDhtLocalPartition.java        | 53 +++---------
 .../dht/topology/GridDhtPartitionTopologyImpl.java | 31 ++++++-
 .../dht/topology/PartitionsEvictManager.java       |  6 +-
 .../cache/transactions/IgniteTxHandler.java        | 14 ++--
 ...acheRemoteMultiplePartitionReservationTest.java |  2 -
 .../TxPartitionCounterStateAbstractTest.java       | 31 ++++++-
 ...ounterStateConsistencyHistoryRebalanceTest.java | 98 ++++++++++++++++++++++
 .../TxPartitionCounterStateConsistencyTest.java    | 24 ++++--
 ...unterStateConsistencyVolatileRebalanceTest.java | 21 +++++
 ...ounterStateOnePrimaryTwoBackupsFailAllTest.java |  7 +-
 15 files changed, 252 insertions(+), 73 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 67a62ca..eba10ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -279,6 +279,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Histogram of blocking PME durations. */
     private volatile HistogramMetric blockingDurationHistogram;
 
+    /** Delay before rebalancing code is start executing after exchange completion. For tests only. */
+    private volatile long rebalanceDelay;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -2479,6 +2482,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param delay Rebalance delay.
+     */
+    public void rebalanceDelay(long delay) {
+        this.rebalanceDelay = delay;
+    }
+
+    /**
      * For testing only.
      *
      * @return Current version to wait for.
@@ -3307,6 +3317,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             rebTopVer = NONE;
 
                         if (!cctx.kernalContext().clientNode() && rebTopVer.equals(NONE)) {
+                            if (rebalanceDelay > 0)
+                                U.sleep(rebalanceDelay);
+
                             assignsMap = new HashMap<>();
 
                             IgniteCacheSnapshotManager snp = cctx.snapshot();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index e73ad52..51fd9cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1086,7 +1086,7 @@ public interface IgniteCacheOffheapManager {
         public void preload() throws IgniteCheckedException;
 
         /**
-         * Reset counters for partition.
+         * Reset counter for partition.
          */
         void resetUpdateCounter();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
index a63479b..3bf92aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterDebugWrapper.java
@@ -198,4 +198,21 @@ public class PartitionTxUpdateCounterDebugWrapper extends PartitionTxUpdateCount
 
         return updated;
     }
+
+    @Override public synchronized void reset() {
+        SB sb = new SB();
+
+        sb.a("[op=reset" +
+            ", grpId=" + grp.groupId() +
+            ", partId=" + partId +
+            ", before=" + toString());
+
+        try {
+            super.reset();
+        }
+        finally {
+            log.debug(sb.a(", after=" + toString() +
+                ']').toString());
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
index 03293c8..06692d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionTxUpdateCounterImpl.java
@@ -325,6 +325,8 @@ public class PartitionTxUpdateCounterImpl implements PartitionUpdateCounter {
 
     /** {@inheritDoc} */
     @Override public synchronized void reset() {
+        initCntr = 0;
+
         cntr.set(0);
 
         reserveCntr.set(0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index fc2a1b6..2ab4d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -239,6 +239,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     part.awaitDestroy();
 
                     part = top.localPartition(p, topVer, true);
+
+                    assert part != null : "Partition was not created [grp=" + grp.name() + ", topVer=" + topVer + ", p=" + p + ']';
+
+                    part.resetUpdateCounter();
                 }
 
                 assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index a7175e0..a45aa92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -786,54 +786,24 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @return {@code true} if evicting thread was added.
+     * @return {@code True} if evicting thread was added.
      */
     private boolean addEvicting() {
-        while (true) {
-            int cnt = evictGuard.get();
-
-            if (cnt != 0)
-                return false;
-
-            if (evictGuard.compareAndSet(cnt, cnt + 1))
-                return true;
-        }
+        return evictGuard.compareAndSet(0, 1);
     }
 
     /**
-     * @return {@code true} if no thread evicting partition at the moment.
+     * Clears evicting flag.
      */
-    private boolean clearEvicting() {
-        boolean free;
-
-        while (true) {
-            int cnt = evictGuard.get();
-
-            assert cnt > 0;
-
-            if (evictGuard.compareAndSet(cnt, cnt - 1)) {
-                free = cnt == 1;
-
-                break;
-            }
-        }
-
-        return free;
+    private void clearEvicting() {
+        evictGuard.set(0);
     }
 
     /**
-     * @return {@code True} if partition is safe to destroy.
+     * @return {@code True} if partition is marked for destroy.
      */
     public boolean markForDestroy() {
-        while (true) {
-            int cnt = evictGuard.get();
-
-            if (cnt != 0)
-                return false;
-
-            if (evictGuard.compareAndSet(0, -1))
-                return true;
-        }
+        return evictGuard.compareAndSet(0, -1);
     }
 
     /**
@@ -947,10 +917,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                 throw e;
             }
             finally {
-                boolean free = clearEvicting();
+                clearEvicting();
 
-                if (free)
-                    clearFuture.finish();
+                clearFuture.finish();
             }
         }
 
@@ -1108,9 +1077,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * Reset partition counters.
+     * Reset partition update counter.
      */
-    public void resetCounters() {
+    public void resetUpdateCounter() {
         store.resetUpdateCounter();
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 50f8a19..5ee3243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -882,12 +882,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
+            boolean recreate = false;
+
             // Make sure that after eviction partition is destroyed.
-            if (loc != null)
+            if (loc != null) {
                 loc.awaitDestroy();
 
+                recreate = true;
+            }
+
             locParts.set(p, loc = partFactory.create(ctx, grp, p));
 
+            if (recreate)
+                loc.resetUpdateCounter();
+
             long updCntr = cntrMap.updateCounter(p);
 
             if (updCntr != 0)
@@ -914,15 +922,23 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             GridDhtLocalPartition part = locParts.get(p);
 
+            boolean recreate = false;
+
             if (part != null) {
                 if (part.state() != EVICTED)
                     return part;
-                else
+                else {
                     part.awaitDestroy();
+
+                    recreate = true;
+                }
             }
 
             part = new GridDhtLocalPartition(ctx, grp, p, true);
 
+            if (recreate)
+                part.resetUpdateCounter();
+
             locParts.set(p, part);
 
             return part;
@@ -970,10 +986,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                 boolean belongs = partitionLocalNode(p, topVer);
 
+                boolean recreate = false;
+
                 if (loc != null && state == EVICTED) {
                     // Make sure that after eviction partition is destroyed.
                     loc.awaitDestroy();
 
+                    recreate = true;
+
                     locParts.set(p, loc = null);
 
                     if (!belongs) {
@@ -1000,6 +1020,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                     locParts.set(p, loc = partFactory.create(ctx, grp, p));
 
+                    if (recreate)
+                        loc.resetUpdateCounter();
+
                     this.updateSeq.incrementAndGet();
 
                     created = true;
@@ -2208,7 +2231,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
 
                                 // Reset counters to zero for triggering full rebalance.
-                                locPart.resetCounters();
+                                locPart.resetUpdateCounter();
                             }
                         }
                     }
@@ -2833,8 +2856,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                long updCntr = part.updateCounter();
                 long initCntr = part.initialUpdateCounter();
+                long updCntr = part.updateCounter();
 
                 if (skipZeros && initCntr == 0L && updCntr == 0L)
                     continue;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 31399bd..3c4b1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -416,10 +416,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
             try {
                 boolean success = part.tryClear(grpEvictionCtx);
 
-                if (success) {
-                    if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
-                        part.destroy();
-                }
+                if (success && part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy())
+                    part.destroy();
 
                 // Complete eviction future before schedule new to prevent deadlock with
                 // simultaneous eviction stopping and scheduling new eviction.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 42e3c19..5eee8ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -66,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeVal
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
@@ -109,6 +108,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
 import static org.apache.ignite.internal.util.lang.GridFunc.isEmpty;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -1767,8 +1767,12 @@ public class IgniteTxHandler {
                         try {
                             tx.addWrite(entry, ctx.deploy().globalLoader());
 
+                            // Entry will be invalidated if a partition was moved to RENTING.
+                            if (locPart.state() == RENTING)
+                                continue;
+
                             if (txCounters != null) {
-                                Long cntr = txCounters.generateNextCounter(entry.cacheId(), entry.cached().partition());
+                                Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
 
                                 if (cntr != null) // Counter is null if entry is no-op.
                                     entry.updateCounter(cntr);
@@ -1910,8 +1914,8 @@ public class IgniteTxHandler {
 
                 if (locPart != null && locPart.reserve()) {
                     try {
-                        // do not process renting partitions.
-                        if (locPart.state() == GridDhtPartitionState.RENTING) {
+                        // Skip renting partitions.
+                        if (locPart.state() == RENTING) {
                             tx.addInvalidPartition(ctx.cacheId(), part);
 
                             continue;
@@ -2308,7 +2312,7 @@ public class IgniteTxHandler {
 
                         if (part != null && part.reserve()) {
                             try {
-                                if (part.state() != GridDhtPartitionState.RENTING) { // Check is actual only for backup node.
+                                if (part.state() != RENTING) { // Check is actual only for backup node.
                                     long start = counter.initialCounter(i);
                                     long delta = counter.updatesCount(i);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
index 015f3ed..c6c269f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheRemoteMultiplePartitionReservationTest.java
@@ -59,8 +59,6 @@ public class TxCrossCacheRemoteMultiplePartitionReservationTest extends GridComm
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setClientMode(igniteInstanceName.equals("client"));
-
         cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
         cfg.setClientMode("client".equals(igniteInstanceName));
         cfg.setCacheConfiguration(cacheConfiguration(CACHE1), cacheConfiguration(CACHE2));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
index c32bd6b..773ddde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -147,11 +149,18 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
         return cfg;
     }
 
+    /**
+     * @return Partitions count.
+     */
+    protected int partitions() {
+        return PARTS_CNT;
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
-        System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
-
         super.beforeTestsStarted();
+
+        System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "false");
     }
 
     /** {@inheritDoc} */
@@ -176,7 +185,7 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
         ccfg.setBackups(backups);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setOnheapCacheEnabled(false);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, partitions()));
 
         return ccfg;
     }
@@ -1096,6 +1105,22 @@ public abstract class TxPartitionCounterStateAbstractTest extends GridCommonAbst
         }
     }
 
+    /**
+     * Blocks tx recovery between all nodes.
+     */
+    protected void blockRecovery() {
+        for (Ignite grid : G.allGrids()) {
+            if (grid.configuration().isClientMode())
+                continue;
+
+            TestRecordingCommunicationSpi.spi(grid).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode clusterNode, Message msg) {
+                    return msg instanceof GridCacheTxRecoveryRequest;
+                }
+            });
+        }
+    }
+
     /** */
     private enum TxState {
         /** Prepare. */ PREPARE,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 6cfa21b..ce23983 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -17,8 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Test;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 
 /**
@@ -26,4 +33,95 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_
  */
 @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
 public class TxPartitionCounterStateConsistencyHistoryRebalanceTest extends TxPartitionCounterStateConsistencyTest {
+    /** */
+    @Test
+    public void testConsistencyAfterBaselineNodeStopAndRemoval() throws Exception {
+        doTestConsistencyAfterBaselineNodeStopAndRemoval(0);
+    }
+
+    /** */
+    @Test
+    public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestart() throws Exception {
+        doTestConsistencyAfterBaselineNodeStopAndRemoval(1);
+    }
+
+    /** */
+    @Test
+    public void testConsistencyAfterBaselineNodeStopAndRemoval_WithRestartAndSkipCheckpoint() throws Exception {
+        doTestConsistencyAfterBaselineNodeStopAndRemoval(2);
+    }
+
+    /**
+     * Test a scenario when partition is evicted and owned again with non-zero initial and current counters.
+     * Such partition should not be historically rebalanced, otherwise only subset of data will be rebalanced.
+     */
+    private void doTestConsistencyAfterBaselineNodeStopAndRemoval(int mode) throws Exception {
+        backups = 2;
+
+        final int srvNodes = SERVER_NODES + 1;
+
+        IgniteEx prim = startGrids(srvNodes);
+
+        prim.cluster().active(true);
+
+        for (int p = 0; p < partitions(); p++) {
+            prim.cache(DEFAULT_CACHE_NAME).put(p, p);
+            prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2);
+        }
+
+        forceCheckpoint();
+
+        stopGrid(1); // topVer=5,0
+
+        awaitPartitionMapExchange();
+
+        resetBaselineTopology(); // topVer=5,1
+
+        awaitPartitionMapExchange();
+
+        forceCheckpoint(); // Will force GridCacheDataStore.exists=true mode after part store re-creation.
+
+        startGrid(1); // topVer=6,0
+
+        awaitPartitionMapExchange();
+
+        resetBaselineTopology(); // topVer=6,1
+
+        awaitPartitionMapExchange(true, true, null);
+
+        // Create counter difference with evicted partition so it's applicable for historical rebalancing.
+        for (int p = 0; p < partitions(); p++)
+            prim.cache(DEFAULT_CACHE_NAME).put(p + PARTS_CNT, p * 2 + 1);
+
+        stopGrid(1); // topVer=7,0
+
+        if (mode > 0) {
+            stopGrid(mode == 1, grid(2).name());
+            stopGrid(mode == 1, grid(3).name());
+
+            startGrid(2);
+            startGrid(3);
+        }
+
+        prim.context().cache().context().exchange().rebalanceDelay(500);
+
+        Random r = new Random();
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        final IgniteInternalFuture<?> fut = doRandomUpdates(r,
+            prim,
+            IntStream.range(0, 1000).boxed().collect(toList()),
+            prim.cache(DEFAULT_CACHE_NAME),
+            stop::get);
+
+        resetBaselineTopology(); // topVer=7,1
+
+        awaitPartitionMapExchange();
+
+        stop.set(true);
+        fut.get();
+
+        assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 0fa28a7..32e2ad6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
@@ -189,7 +190,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
             }
         }, 1, "node-restarter");
 
-        doRandomUpdates(r, client, primaryKeys, cache, stop).get(stop + 30_000);
+        doRandomUpdates(r, client, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
         fut.get();
 
         assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
@@ -253,7 +254,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
             }
         }, 1, "node-restarter");
 
-        doRandomUpdates(r, prim, primaryKeys, cache, stop).get(stop + 30_000);
+        doRandomUpdates(r, prim, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
         fut.get();
 
         assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
@@ -329,7 +330,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
         }, 1, "node-restarter");
 
         // Wait with timeout to avoid hanging suite.
-        doRandomUpdates(r, prim, primaryKeys, cache, stop).get(stop + 30_000);
+        doRandomUpdates(r, prim, primaryKeys, cache, () -> U.currentTimeMillis() >= stop).get(stop + 30_000);
         fut.get();
 
         assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
@@ -702,7 +703,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
 
         // Put one key per partition.
         try(IgniteDataStreamer<Object, Object> streamer = client.dataStreamer(DEFAULT_CACHE_NAME)) {
-            for (int k = 0; k < PARTS_CNT; k++)
+            for (int k = 0; k < partitions(); k++)
                 streamer.addData(k, 0);
         }
 
@@ -839,7 +840,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
         IgniteCache<Object, Object> cache2 = client.getOrCreateCache(cacheConfiguration(DEFAULT_CACHE_NAME + "2"));
 
         // Put one key per partition.
-        for (int k = 0; k < PARTS_CNT; k++) {
+        for (int k = 0; k < partitions(); k++) {
             cache.put(k, 0);
             cache2.put(k, 0);
         }
@@ -973,18 +974,23 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
      * @param near Near node.
      * @param primaryKeys Primary keys.
      * @param cache Cache.
-     * @param stop Time to stop.
+     * @param stopClo A closure providing stop condition.
      * @return Finish future.
      */
-    private IgniteInternalFuture<?> doRandomUpdates(Random r, Ignite near, List<Integer> primaryKeys,
-        IgniteCache<Object, Object> cache, long stop) throws Exception {
+    protected IgniteInternalFuture<?> doRandomUpdates(
+        Random r,
+        Ignite near,
+        List<Integer> primaryKeys,
+        IgniteCache<Object, Object> cache,
+        BooleanSupplier stopClo
+    ) throws Exception {
         LongAdder puts = new LongAdder();
         LongAdder removes = new LongAdder();
 
         final int max = 100;
 
         return multithreadedAsync(() -> {
-            while (U.currentTimeMillis() < stop) {
+            while (!stopClo.getAsBoolean()) {
                 int rangeStart = r.nextInt(primaryKeys.size() - max);
                 int range = 5 + r.nextInt(max - 5);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
index ac3d0a0..b3ef2d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyVolatileRebalanceTest.java
@@ -22,6 +22,8 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.junit.Ignore;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_ENABLED;
+
 /**
  * Test partitions consistency in various scenarios when all rebalance is in-memory.
  */
@@ -32,6 +34,20 @@ public class TxPartitionCounterStateConsistencyVolatileRebalanceTest extends TxP
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED, "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IGNITE_BASELINE_AUTO_ADJUST_ENABLED);
+    }
+
+    /** {@inheritDoc} */
     @Ignore
     @Override public void testSingleThreadedUpdateOrder() throws Exception {
         // Not applicable for volatile mode.
@@ -47,4 +63,9 @@ public class TxPartitionCounterStateConsistencyVolatileRebalanceTest extends TxP
     @Override protected void forceCheckpoint(Collection<Ignite> nodes) throws IgniteCheckedException {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override protected int partitions() {
+        return 1024;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
index d9fcd93..00e3256 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest.java
@@ -297,10 +297,11 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
                                 }
 
                                 if (backup == backup1) {
-                                    // Stop all backups first or recovery will commit a transaction on backups.
+                                    blockRecovery();
+
                                     stopGrid(skipCheckpoint, txTop.get2().get(0).name());
                                     stopGrid(skipCheckpoint, txTop.get2().get(1).name());
-                                    stopAllGrids();
+                                    stopAllGrids(); // Stop all remaining nodes.
                                 }
 
                                 return true;
@@ -320,7 +321,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest extends TxPa
             System.setProperty(IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY, "true");
 
         try {
-            // Start only backups.
+            // Start only backups in given order.
             startGrid(txTop.get(PARTITION_ID).get2().get(backupsStartOrder[0]).name());
             startGrid(txTop.get(PARTITION_ID).get2().get(backupsStartOrder[1]).name());