You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/11/01 17:03:26 UTC

[ignite] branch master updated: IGNITE-15848 Fixed consistency issue for persistence atomic cache. Fixes #9541

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c189a9  IGNITE-15848 Fixed consistency issue for persistence atomic cache. Fixes #9541
5c189a9 is described below

commit 5c189a932d53a5c971d859b6ecd1098ad1da831c
Author: sergeyuttsel <ut...@gmail.com>
AuthorDate: Mon Nov 1 20:02:53 2021 +0300

    IGNITE-15848 Fixed consistency issue for persistence atomic cache. Fixes #9541
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../wal/record/PartitionClearingStartRecord.java   | 98 ++++++++++++++++++++++
 .../internal/pagemem/wal/record/WALRecord.java     |  5 +-
 .../dht/topology/GridDhtLocalPartition.java        | 26 +++++-
 .../dht/topology/PartitionsEvictManager.java       | 32 +++++--
 .../GridCacheDatabaseSharedManager.java            | 47 +++++++++++
 .../wal/serializer/RecordDataV1Serializer.java     | 24 ++++++
 .../wal/serializer/RecordDataV2Serializer.java     |  3 +
 .../TxPartitionCounterStateConsistencyTest.java    | 91 ++++++++++++++++++++
 .../testframework/wal/record/RecordUtils.java      |  8 ++
 9 files changed, 322 insertions(+), 12 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java
new file mode 100644
index 0000000..7f88bd7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PartitionClearingStartRecord.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Partition clearing started record.
+ * Used to repeat clearing if node was stopped without checkpoint after clearing on a rebalance.
+ */
+public class PartitionClearingStartRecord extends WALRecord {
+    /** Partition ID. */
+    private int partId;
+
+    /** Cache group ID. */
+    private int grpId;
+
+    /** Clear version. */
+    private long clearVer;
+
+    /**
+     * @param partId Partition ID.
+     * @param grpId Cache group ID.
+     * @param clearVer Clear version.
+     */
+    public PartitionClearingStartRecord(int partId, int grpId, long clearVer) {
+        this.partId = partId;
+        this.grpId = grpId;
+        this.clearVer = clearVer;
+    }
+
+    /**
+     * @return Partition ID.
+     */
+    public int partitionId() {
+        return partId;
+    }
+
+    /**
+     * @param partId Partition ID.
+     */
+    public void partitionId(int partId) {
+        this.partId = partId;
+    }
+
+    /**
+     * @return Cache group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     */
+    public void groupId(int grpId) {
+        this.grpId = grpId;
+    }
+
+    /**
+     * @return Clear version.
+     */
+    public long clearVersion() {
+        return clearVer;
+    }
+
+    /**
+     * @param clearVer Clear version.
+     */
+    public void clearVersion(long clearVer) {
+        this.clearVer = clearVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.PARTITION_CLEARING_START_RECORD;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PartitionClearingStartRecord.class, this, "super", super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 208d687..f3f9f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -273,7 +273,10 @@ public abstract class WALRecord {
         ENCRYPTED_DATA_RECORD_V3(71, LOGICAL),
 
         /** Record for renaming the index root pages. */
-        INDEX_ROOT_PAGE_RENAME_RECORD(72, LOGICAL);
+        INDEX_ROOT_PAGE_RENAME_RECORD(72, LOGICAL),
+
+        /** Partition clearing start. */
+        PARTITION_CLEARING_START_RECORD(73, LOGICAL);
 
         /** Index for serialization. Should be consistent throughout all versions. */
         private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index ab27399..ec435d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -39,6 +39,7 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -77,6 +78,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
@@ -659,6 +661,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * Used to set a version from {@link PartitionClearingStartRecord} when need to repeat a clearing after node restart.
+     * @param clearVer Clear version.
+     */
+    public void updateClearVersion(long clearVer) {
+        this.clearVer = clearVer;
+    }
+
+    /**
      * @return {@code True} if partition state changed.
      */
     public boolean markLost() {
@@ -983,7 +993,18 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
         CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
+        boolean recoveryMode = ctx.kernalContext().recoveryMode();
+
         try {
+            // If a partition was not checkpointed after clearing on a rebalance and a node was stopped,
+            // then it's need to repeat clearing on node start. So need to write a partition clearing start record
+            // and repeat clearing on applying updates from WAL if the record was read.
+            // It's need for atomic cache only. Transactional cache start a rebalance due to outdated counter in this case,
+            // because atomic and transactional caches use different partition counters implementation.
+            if (state() == MOVING && !recoveryMode && grp.walEnabled() &&
+                grp.config().getAtomicityMode() == ATOMIC)
+                ctx.wal().log(new PartitionClearingStartRecord(id, grp.groupId(), order));
+
             GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
 
             while (it0.hasNext()) {
@@ -1002,7 +1023,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                     // Partition state can be switched from RENTING to MOVING and vice versa during clearing.
                     long order0 = row.version().order();
 
-                    if (state() == MOVING && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order))
+                    if ((state() == MOVING || recoveryMode) && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order))
                         continue;
 
                     if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
@@ -1067,7 +1088,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
             }
 
             // Attempt to destroy.
-            ((GridDhtPreloader)grp.preloader()).tryFinishEviction(this);
+            if (!recoveryMode)
+                ((GridDhtPreloader)grp.preloader()).tryFinishEviction(this);
         }
         catch (NodeStoppingException e) {
             if (log.isDebugEnabled())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 5e15ad8..f6a4aae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -35,6 +35,7 @@ import org.apache.ignite.SystemProperty;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheStoppedException;
@@ -141,11 +142,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
             GroupEvictionContext grpEvictionCtx = evictionGroupsMap.computeIfAbsent(
                 grpId, k -> new GroupEvictionContext(grp));
 
-            EvictReason reason = part.state() == RENTING ? EvictReason.EVICTION : EvictReason.CLEARING;
+            EvictReason reason = context().kernalContext().recoveryMode() ? EvictReason.CLEARING_ON_RECOVERY :
+                part.state() == RENTING ? EvictReason.EVICTION : EvictReason.CLEARING;
 
             if (log.isDebugEnabled())
                 log.debug("The partition has been scheduled for clearing [grp=" + grp.cacheOrGroupName()
-                    + ", topVer=" + grp.topology().readyTopologyVersion()
+                    + ", topVer=" + (cctx.kernalContext().recoveryMode() ?
+                    AffinityTopologyVersion.NONE : grp.topology().readyTopologyVersion())
                     + ", id=" + part.id() + ", state=" + part.state()
                     + ", fullSize=" + part.fullSize() + ", reason=" + reason + ']');
 
@@ -410,7 +413,8 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
 
                 if (log.isDebugEnabled()) {
                     log.debug("The partition has been cleared [grp=" + part.group().cacheOrGroupName() +
-                        ", topVer=" + part.group().topology().readyTopologyVersion() +
+                        ", topVer=" + (cctx.kernalContext().recoveryMode() ?
+                        AffinityTopologyVersion.NONE : part.group().topology().readyTopologyVersion()) +
                         ", id=" + part.id() + ", state=" + part.state() + ", cleared=" + clearedEntities +
                         ", fullSize=" + part.fullSize() + ']');
                 }
@@ -425,7 +429,9 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
                 if (cctx.kernalContext().isStopping()) {
                     LT.warn(log, ex, "Partition eviction has been cancelled (local node is stopping) " +
                         "[grp=" + grpEvictionCtx.grp.cacheOrGroupName() +
-                        ", readyVer=" + grpEvictionCtx.grp.topology().readyTopologyVersion() + ']',
+                        ", readyVer=" + (cctx.kernalContext().recoveryMode() ?
+                            AffinityTopologyVersion.NONE : grpEvictionCtx.grp.topology().readyTopologyVersion()) +
+                        ']',
                         false,
                         true);
                 }
@@ -456,7 +462,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
          * Partition evicted after changing to
          * {@link GridDhtPartitionState#MOVING MOVING} state.
          */
-        CLEARING;
+        CLEARING,
+
+        /**
+         * Partition clearing on logical WAL recovery.
+         * Used to repeat partition clearing if the node was stopped without previous clearing checkpointed.
+         */
+        CLEARING_ON_RECOVERY;
 
         /** {@inheritDoc} */
         @Override public String toString() {
@@ -469,11 +481,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter {
      * @param c Update closure.
      */
     private void updateMetrics(CacheGroupContext grp, EvictReason reason, BiConsumer<EvictReason, CacheMetricsImpl> c) {
-        for (GridCacheContext cctx : grp.caches()) {
-            if (cctx.statisticsEnabled()) {
-                final CacheMetricsImpl metrics = cctx.cache().metrics0();
+        if (reason != EvictReason.CLEARING_ON_RECOVERY) {
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.statisticsEnabled()) {
+                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
 
-                c.accept(reason, metrics);
+                    c.accept(reason, metrics);
+                }
             }
         }
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 45edca2..48c84de 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
@@ -147,6 +148,7 @@ import org.apache.ignite.internal.util.GridCountDownCallback;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.TimeBag;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -375,10 +377,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private final SimpleDistributedProperty<Integer> historicalRebalanceThreshold =
         new SimpleDistributedProperty<>(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, Integer::parseInt);
 
+    /** */
+    private GridKernalContext ctx;
+
     /**
      * @param ctx Kernal context.
      */
     public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
+        this.ctx = ctx;
+
         IgniteConfiguration cfg = ctx.config();
 
         persistenceCfg = cfg.getDataStorageConfiguration();
@@ -2829,6 +2836,46 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         }
 
                         break;
+
+                    case PARTITION_CLEARING_START_RECORD:
+                        PartitionClearingStartRecord rec0 = (PartitionClearingStartRecord) rec;
+
+                        CacheGroupContext grp = this.ctx.cache().cacheGroup(rec0.groupId());
+
+                        if (grp != null) {
+                            GridDhtLocalPartition part;
+
+                            try {
+                                part = grp.topology().forceCreatePartition(rec0.partitionId());
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw new IgniteException("Cannot get or create a partition [groupId=" + rec0.groupId() +
+                                    ", partitionId=" + rec0.partitionId() + "]", e);
+                            }
+
+                            stripedApply(() -> {
+                                try {
+                                    part.updateClearVersion(rec0.clearVersion());
+
+                                    IgniteInternalFuture<?> clearFut = grp.
+                                        shared().
+                                        evict().
+                                        evictPartitionAsync(grp, part, new GridFutureAdapter<>());
+
+                                    clearFut.get();
+
+                                    part.updateClearVersion();
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to apply partition clearing record, " + rec0);
+
+                                    applyError.compareAndSet(null, e);
+                                }
+                            }, rec0.groupId(), rec0.partitionId(), exec, semaphore);
+                        }
+
+                        break;
+
                     default:
                         // Skip other records.
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 4c1a8bc..83c882c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -569,6 +570,9 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
             case INDEX_ROOT_PAGE_RENAME_RECORD:
                 return ((IndexRenameRootPageRecord) record).dataSize();
 
+            case PARTITION_CLEARING_START_RECORD:
+                return 4 + 4 + 8;
+
             default:
                 throw new UnsupportedOperationException("Type: " + record.type());
         }
@@ -1278,6 +1282,15 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 break;
 
+            case PARTITION_CLEARING_START_RECORD:
+                int partId0 = in.readInt();
+                int grpId = in.readInt();
+                long clearVer = in.readLong();
+
+                res = new PartitionClearingStartRecord(partId0, grpId, clearVer);
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + type);
         }
@@ -1910,6 +1923,17 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
 
                 break;
 
+            case PARTITION_CLEARING_START_RECORD:
+                PartitionClearingStartRecord partitionClearingStartRecord = (PartitionClearingStartRecord)rec;
+
+                buf.putInt(partitionClearingStartRecord.partitionId());
+
+                buf.putInt(partitionClearingStartRecord.groupId());
+
+                buf.putLong(partitionClearingStartRecord.clearVersion());
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + rec.type());
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 56568cd..2637d93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -114,6 +114,9 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
             case TRACKING_PAGE_REPAIR_DELTA:
                 return 4 + 8;
 
+            case PARTITION_CLEARING_START_RECORD:
+                return 4 + 4 + 8;
+
             default:
                 return super.plainSize(rec);
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 2a71918..eefbcc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -169,6 +169,97 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
     }
 
     /**
+     * There is a key that was removed while a primary node was stopped.
+     * After restart a primary node there is a full rebalance with clearing.
+     * So the remove operation was not writen to WAL. Also the partition was not chekpointed.
+     * After second restart a primary node need to repeat the partition clearing to clear the key.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPartitionConsistencyNotRebalancedRemoveOpWithPrimaryRestart() throws Exception {
+        testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(true);
+    }
+
+    /**
+     * There is a key that was removed while a backup node was stopped.
+     * After restart a backup node there is a full rebalance with clearing.
+     * So the remove operation was not writen to WAL. Also the partition was not chekpointed.
+     * After second restart a backup node need to repeat the partition clearing to clear the key.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPartitionConsistencyNotRebalancedRemoveOpWithBackupRestart() throws Exception {
+        testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(false);
+    }
+
+    /**
+     * @param primary Restart primary or backup node.
+     * @throws Exception If failed.
+     */
+    public void testPartitionConsistencyNotRebalancedRemoveOpWithNodeRestart(boolean primary) throws Exception {
+        backups = 1;
+
+        Ignite srv = startGridsMultiThreaded(2);
+
+        IgniteEx client = startGrid("client");
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        List<Integer> cacheKeys;
+
+        if (primary)
+            cacheKeys = primaryKeys(srv.cache(DEFAULT_CACHE_NAME), partitions() * 4);
+        else
+            cacheKeys = backupKeys(srv.cache(DEFAULT_CACHE_NAME), partitions() * 4, 0);
+
+        List<Integer> partKeys = new ArrayList<>();
+
+        int partId = -1;
+
+        for (Integer key : cacheKeys) {
+            if (partId == -1 || srv.affinity(DEFAULT_CACHE_NAME).partition(key) == partId) {
+                if (partId == -1)
+                    partId = srv.affinity(DEFAULT_CACHE_NAME).partition(key);
+
+                partKeys.add(key);
+
+                if (partKeys.size() == 3)
+                    break;
+            }
+        }
+
+        assertTrue("Failed to find the required number of keys.", partKeys.size() == 3);
+
+        if (log().isInfoEnabled())
+            log().info("partKeys: " + partKeys);
+
+        for (int i = 0; i < 2; i++) {
+            if (i == 0)
+                cache.put(partKeys.get(0), 1234567);
+
+            stopGrid(true, srv.name());
+
+            awaitPartitionMapExchange();
+
+            if (i == 0) {
+                cache.remove(partKeys.get(0));
+                cache.put(partKeys.get(1), 112233);
+            }
+
+            startGrid(srv.name());
+
+            awaitPartitionMapExchange(true, true, null);
+
+            if (i == 0)
+                cache.put(partKeys.get(2), 7654321);
+        }
+
+        assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
+    }
+
+    /**
      * Test primary-backup partitions consistency while restarting primary node under load.
      */
     @Test
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 5317b48..d633898 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.PartitionClearingStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
@@ -147,6 +148,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGES_LIST_SET_PREVIOUS;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGE_LIST_META_RESET_COUNT_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PAGE_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_CLEARING_START_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_DESTROY;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_META_PAGE_DELTA_RECORD_V3;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.PARTITION_META_PAGE_DELTA_RECORD_V4;
@@ -247,6 +249,7 @@ public class RecordUtils {
             put(BTREE_META_PAGE_INIT_ROOT_V3, RecordUtils::buildBtreeMetaPageInitRootV3);
             put(OUT_OF_ORDER_UPDATE, RecordUtils::buildOutOfOrderRecord);
             put(INDEX_ROOT_PAGE_RENAME_RECORD, RecordUtils::buildIndexRenameRootPageRecord);
+            put(PARTITION_CLEARING_START_RECORD, RecordUtils::buildPartitionClearingStartedRecord);
         }};
 
     /** **/
@@ -668,4 +671,9 @@ public class RecordUtils {
             666
         );
     }
+
+    /** **/
+    public static PartitionClearingStartRecord buildPartitionClearingStartedRecord() {
+        return new PartitionClearingStartRecord(12, 345, 123456789);
+    }
 }