You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/12/28 11:52:02 UTC

(ignite) 01/03: IGNITE-21170 Partition divergence on instable topology in replicated cahce

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch ignite-21170
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit a7d5e215ce8aa790b27040f694b78dffc3ae30c4
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Dec 27 22:44:44 2023 +0300

    IGNITE-21170 Partition divergence on instable topology in replicated cahce
---
 .../cache/distributed/dht/GridDhtCacheEntry.java   |   6 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  96 +++---
 .../continuous/CacheContinuousQueryHandler.java    |   2 +-
 ...tionCacheConsistencyOnUnstableTopologyTest.java | 336 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite9.java   |   2 +
 5 files changed, 398 insertions(+), 44 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 05de436568e..d7d82868e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -105,8 +105,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
             return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, init, primaryCntr);
         }
         catch (Throwable t) {
-            log.error("Failed to update counter for atomic cache [" +
-                ", initial=" + init +
+            log.error("Failed to update counter for atomic cache [initial=" + init +
                 ", primaryCntr=" + primaryCntr +
                 ", part=" + locPart + ']', t);
 
@@ -120,8 +119,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
             return locPart.nextUpdateCounter(cctx.cacheId(), tx, primaryCntr);
         }
         catch (Throwable t) {
-            log.error("Failed to update counter for tx cache [" +
-                ", primaryCntr=" + primaryCntr +
+            log.error("Failed to update counter for tx cache [primaryCntr=" + primaryCntr +
                 ", part=" + locPart + ", tx=" + CU.txString(tx) + ']', t);
 
             throw t;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 7625fe7e813..394707e4f12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -118,10 +118,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private Set<Integer> lostParts;
 
-    /** */
+    /**
+     * The set of partitons nodes exists on nodes, which shouldn't relate to the idial assignment.
+     * This property is not calculated for replicated caches.
+     */
     private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
 
-    /** */
+    /**
+     * It is a topology in which the partition map was last updated.
+     * The topology is also the topology where the {@code diffFromAffinityVer} is calculated.
+     */
     private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
 
     /** Last started exchange version (always >= readyTopVer). */
@@ -317,6 +323,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         return topVer;
     }
 
+    /**
+     * Gets the last topology version where partition distribution is considered as the ideal one.
+     *
+     * @return Affinity topology version.
+     */
+    public AffinityTopologyVersion getRebalancedTopVer() {
+        return rebalancedTopVer;
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion lastTopologyChangeVersion() {
         AffinityTopologyVersion topVer = this.lastTopChangeVer;
@@ -1613,28 +1628,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         ", fullMap=" + fullMapString() + ']');
                 }
 
-                if (exchangeVer == null && !grp.isReplicated() &&
-                        (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
-                    AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
+                if (exchangeVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
+                    if (!grp.isReplicated()) {
+                        AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
-                    for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
-                        for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
-                            int p = e0.getKey();
+                        for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+                            for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+                                int p = e0.getKey();
 
-                            Set<UUID> diffIds = diffFromAffinity.get(p);
+                                Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                            if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
-                                !affAssignment.getIds(p).contains(e.getKey())) {
+                                if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
+                                    !affAssignment.getIds(p).contains(e.getKey())) {
 
-                                if (diffIds == null)
-                                    diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+                                    if (diffIds == null)
+                                        diffFromAffinity.put(p, diffIds = U.newHashSet(3));
 
-                                diffIds.add(e.getKey());
-                            }
-                            else {
-                                if (diffIds != null && diffIds.remove(e.getKey())) {
-                                    if (diffIds.isEmpty())
-                                        diffFromAffinity.remove(p);
+                                    diffIds.add(e.getKey());
+                                }
+                                else {
+                                    if (diffIds != null && diffIds.remove(e.getKey())) {
+                                        if (diffIds.isEmpty())
+                                            diffFromAffinity.remove(p);
+                                    }
                                 }
                             }
                         }
@@ -1926,8 +1942,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 node2part.put(parts.nodeId(), parts);
 
                 // During exchange diff is calculated after all messages are received and affinity initialized.
-                if (exchId == null && !grp.isReplicated()) {
-                    if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+                if (exchId == null && readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+                    if (!grp.isReplicated()) {
                         AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
                         // Add new mappings.
@@ -1953,6 +1969,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 }
                             }
                         }
+                    }
 
                         // Remove obsolete mappings.
                         if (cur != null) {
@@ -1968,8 +1985,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             }
                         }
 
-                        diffFromAffinityVer = readyTopVer;
-                    }
+                    diffFromAffinityVer = readyTopVer;
                 }
 
                 if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
@@ -2020,22 +2036,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (fut != null)
                 discoCache = fut.events().discoveryCache();
 
-            if (!grp.isReplicated()) {
-                boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
-                    fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
-
-                if (rebuildDiff) {
-                    if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
-                        rebuildDiff(assignment);
-                }
-                else
-                    diffFromAffinityVer = readyTopVer;
-
-                if (!updateRebalanceVer)
-                    updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
-            }
+            if (!grp.isReplicated() && isRebuildDiffAssignmentRequired(fut)) {
+                if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+                    rebuildDiff(assignment);
+            } else
+                diffFromAffinityVer = readyTopVer;
 
-            if (updateRebalanceVer)
+            if (!grp.isReplicated() || updateRebalanceVer)
                 updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
 
             // Own orphan moving partitions (having no suppliers).
@@ -2047,6 +2054,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * Checks is the rebuild required or not.
+     *
+     * @param fut Exchnage future.
+     * @return True if the rebuild difference assignment is required, false otherwise.
+     */
+    private boolean isRebuildDiffAssignmentRequired(@Nullable GridDhtPartitionsExchangeFuture fut) {
+        return fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
+            fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
+    }
+
     /**
      * Check if some local moving partitions have no suitable supplier and own them.
      * This can happen if a partition has been created by affinity assignment on new node and no supplier exists.
@@ -3121,7 +3139,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      */
     private void updateRebalanceVersion(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff) {
-        if (!grp.isReplicated() && !affVer.equals(diffFromAffinityVer))
+        if (!affVer.equals(diffFromAffinityVer))
             return;
 
         if (!rebalancedTopVer.equals(readyTopVer)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4118e9bd854..19ec83a9bd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1007,7 +1007,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-        IgniteCache<?, ?> cache = ctx.cache().jcache(cctx.name());
+        IgniteCache<?, ?> cache = ctx.cache().jcache(cacheName);
 
         // Initial query entry or evicted entry. These events should be fired immediately.
         if (internal || e.updateCounter() == -1L)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java
new file mode 100644
index 00000000000..12e5bb539f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2023 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/**
+ * The tets demonstrate the synchronization of shifting between topology versions on clusters during rebalance.
+ * Two nodes join toopology simultaneously, and the rebalancing topology on the entire cluster will be switched
+ * only when both rebalancings for the nodes finish.
+ */
+public class ReplicationCacheConsistencyOnUnstableTopologyTest extends GridCommonAbstractTest {
+    /**
+     * Cache mode.
+     */
+    private CacheMode cacheMode;
+
+    /**
+     * Cache write synchronization mode.
+     */
+    private CacheWriteSynchronizationMode writeSynchronizationMode;
+
+    /**
+     * True if the cache read operation can execute on backup replicas.
+     */
+    private boolean readFromBackup;
+
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(100L * 1024 * 1024)
+                    .setPersistenceEnabled(true)))
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 3))
+                .setCacheMode(cacheMode)
+                .setBackups(2)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setWriteSynchronizationMode(writeSynchronizationMode)
+                .setReadFromBackup(readFromBackup));
+    }
+
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    @Test
+    public void testReplicatedFullSync() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, false);
+    }
+
+    @Test
+    public void testReplicatedFullSyncReadFromBackup() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, true);
+    }
+
+    @Test
+    public void testReplicatedPrimarySync() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false);
+    }
+
+    @Test
+    public void testReplicatedPrimarySyncReadFromBackup() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true);
+    }
+
+    @Test
+    public void testReplicatedFullAsync() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, false);
+    }
+
+    @Test
+    public void testReplicatedFullAsyncReadFromBackup() throws Exception {
+        process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, true);
+    }
+
+    @Test
+    public void testPartitionedFullSync() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, false);
+    }
+
+    @Test
+    public void testPartitionedFullSyncReadFromBackup() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, true);
+    }
+
+    @Test
+    public void testPartitionedPrimarySync() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false);
+    }
+
+    @Test
+    public void testPartitionedPrimarySyncReadFromBackup() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true);
+    }
+
+    @Test
+    public void testPartitionedFullAsync() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, false);
+    }
+
+    @Test
+    public void testPartitionedFullAsyncReadFromBackup() throws Exception {
+        process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, true);
+    }
+
+    /**
+     * Executes a test scenario.
+     *
+     * @param cacheMode                Cache mode.
+     * @param writeSynchronizationMode Cache write synchronization mode.
+     * @param readFromBackup           True if the cache read operation can execute on backup replicas.
+     * @throws Exception If fail.
+     */
+    private void process(
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSynchronizationMode,
+        boolean readFromBackup
+    ) throws Exception {
+        this.cacheMode = cacheMode;
+        this.writeSynchronizationMode = writeSynchronizationMode;
+        this.readFromBackup = readFromBackup;
+
+        IgniteEx ignite = startGrids(3);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        assertEquals(0, ignite.cache(DEFAULT_CACHE_NAME).size());
+
+        IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+        streamer.allowOverwrite(false);
+
+        for (int i = 0; i < 20; i++) {
+            streamer.addData(i, i);
+        }
+
+        streamer.flush();
+
+        ignite(1).close();
+
+        for (int i = 20; i < 40; i++) {
+            streamer.addData(i, i);
+        }
+
+        streamer.flush();
+
+        ignite(2).close();
+
+        for (int i = 40; i < 60; i++) {
+            streamer.addData(i, i);
+        }
+
+        streamer.close();
+
+        spi(ignite).blockMessages((node, message) -> {
+            if (message instanceof GridDhtPartitionSupplyMessage && testNodeName(2).equals(node.consistentId())) {
+                GridDhtPartitionSupplyMessage supplyMessage = ((GridDhtPartitionSupplyMessage)message);
+
+                return supplyMessage.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
+            }
+
+            return false;
+        });
+
+        startGrid(1);
+        startGrid(2);
+
+        AffinityTopologyVersion rebTopVer = getRebalancedTopVer(ignite);
+
+        assertEquals(rebTopVer, getRebalancedTopVer(ignite(1)));
+        assertEquals(rebTopVer, getRebalancedTopVer(ignite(2)));
+
+        HashSet<Integer> keysToUpdate = new HashSet<>(9);
+
+        // The keys were loaded on the nodes.
+        keysToUpdate.add(partitionKeys(0, 0, 20));
+        keysToUpdate.add(partitionKeys(1, 0, 20));
+        keysToUpdate.add(partitionKeys(2, 0, 20));
+
+        //The keys were loaded on topology without one node.
+        keysToUpdate.add(partitionKeys(0, 20, 40));
+        keysToUpdate.add(partitionKeys(1, 20, 40));
+        keysToUpdate.add(partitionKeys(2, 20, 40));
+
+        // The keys were loaded on topology without two nodes.
+        keysToUpdate.add(partitionKeys(0, 40, 60));
+        keysToUpdate.add(partitionKeys(1, 40, 60));
+        keysToUpdate.add(partitionKeys(2, 40, 60));
+
+        for (Integer key : keysToUpdate) {
+            info("Intention to invike [key: " + key +
+                " part: " + ignite.affinity(DEFAULT_CACHE_NAME).partition(key) +
+                " primary: " + ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key) + ']');
+        }
+
+        HashMap<Integer, EntryProcessor<Integer, Integer, Void>> invokes = new HashMap<>(keysToUpdate.size());
+
+        for (Integer key : keysToUpdate) {
+            invokes.put(key, new TestEntryProcessor(100));
+        }
+
+        checkTopology(3);
+
+        ignite.<Integer, Integer>cache(DEFAULT_CACHE_NAME).invokeAll(invokes);
+
+        spi(ignite).stopBlock();
+
+        awaitPartitionMapExchange();
+
+        rebTopVer = getRebalancedTopVer(ignite);
+
+        assertEquals(rebTopVer, getRebalancedTopVer(ignite(1)));
+        assertEquals(rebTopVer, getRebalancedTopVer(ignite(2)));
+
+        assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME));
+    }
+
+    /**
+     * Finds a partition key.
+     *
+     * @param part Partiton.
+     * @param from Left search bound.
+     * @param to   Right search bound.
+     * @return A keyu.
+     */
+    protected Integer partitionKeys(int part, int from, int to) {
+        Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+
+        for (int k = from; k < to; k++) {
+            if (aff.partition(k) == part) {
+                return k;
+            }
+        }
+
+        throw new AssertionError("Key was not found [pat=" + part + ", from=" + from + ", to=" + to + ']');
+    }
+
+    /**
+     * Gets rebalance topology version for the Ignite instance.
+     *
+     * @param instance Ignite instance.
+     * @return Topologyy version.
+     */
+    private static AffinityTopologyVersion getRebalancedTopVer(IgniteEx instance) {
+        return ((GridDhtPartitionTopologyImpl)instance.context().cache()
+            .cache(DEFAULT_CACHE_NAME).context().topology()).getRebalancedTopVer();
+    }
+
+    /**
+     * The entry processor is intended to update a value when the previous one exists.
+     */
+    private static class TestEntryProcessor implements EntryProcessor<Integer, Integer, Void> {
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        private final Integer val;
+
+        public TestEntryProcessor(Integer val) {
+            this.val = val;
+        }
+
+        @Override public Void process(
+            MutableEntry<Integer, Integer> mutableEntry,
+            Object... objects
+        ) throws EntryProcessorException {
+            log.info("Updating entry [from=" + mutableEntry.getValue() + ", to=" + val + ']');
+
+            if (!mutableEntry.exists())
+                return null;
+
+            Integer entryVal = mutableEntry.getValue();
+
+            if (entryVal == null)
+                return null;
+
+            mutableEntry.setValue(val);
+
+            log.info("Updated entry [from=" + entryVal + ", to=" + val + ']');
+
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 0ff129fee58..e5cf49f3497 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.cache.ReplicationCacheConsistencyOnUnstableTopologyTest;
 import org.apache.ignite.internal.processors.cache.CachePutIfAbsentTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
@@ -99,6 +100,7 @@ public class IgniteCacheTestSuite9 {
         GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyVolatileRebalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxCrossCachePartitionConsistencyTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyNoopInvokeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, ReplicationCacheConsistencyOnUnstableTopologyTest.class, ignoredTests);
 
         return suite;
     }