You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/12/28 11:52:02 UTC
(ignite) 01/03: IGNITE-21170 Partition divergence on instable topology in replicated cahce
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch ignite-21170
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit a7d5e215ce8aa790b27040f694b78dffc3ae30c4
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Dec 27 22:44:44 2023 +0300
IGNITE-21170 Partition divergence on instable topology in replicated cahce
---
.../cache/distributed/dht/GridDhtCacheEntry.java | 6 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 96 +++---
.../continuous/CacheContinuousQueryHandler.java | 2 +-
...tionCacheConsistencyOnUnstableTopologyTest.java | 336 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite9.java | 2 +
5 files changed, 398 insertions(+), 44 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 05de436568e..d7d82868e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -105,8 +105,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, init, primaryCntr);
}
catch (Throwable t) {
- log.error("Failed to update counter for atomic cache [" +
- ", initial=" + init +
+ log.error("Failed to update counter for atomic cache [initial=" + init +
", primaryCntr=" + primaryCntr +
", part=" + locPart + ']', t);
@@ -120,8 +119,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
return locPart.nextUpdateCounter(cctx.cacheId(), tx, primaryCntr);
}
catch (Throwable t) {
- log.error("Failed to update counter for tx cache [" +
- ", primaryCntr=" + primaryCntr +
+ log.error("Failed to update counter for tx cache [primaryCntr=" + primaryCntr +
", part=" + locPart + ", tx=" + CU.txString(tx) + ']', t);
throw t;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 7625fe7e813..394707e4f12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -118,10 +118,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private Set<Integer> lostParts;
- /** */
+ /**
+ * The set of partitons nodes exists on nodes, which shouldn't relate to the idial assignment.
+ * This property is not calculated for replicated caches.
+ */
private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
- /** */
+ /**
+ * It is a topology in which the partition map was last updated.
+ * The topology is also the topology where the {@code diffFromAffinityVer} is calculated.
+ */
private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
/** Last started exchange version (always >= readyTopVer). */
@@ -317,6 +323,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return topVer;
}
+ /**
+ * Gets the last topology version where partition distribution is considered as the ideal one.
+ *
+ * @return Affinity topology version.
+ */
+ public AffinityTopologyVersion getRebalancedTopVer() {
+ return rebalancedTopVer;
+ }
+
/** {@inheritDoc} */
@Override public AffinityTopologyVersion lastTopologyChangeVersion() {
AffinityTopologyVersion topVer = this.lastTopChangeVer;
@@ -1613,28 +1628,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
", fullMap=" + fullMapString() + ']');
}
- if (exchangeVer == null && !grp.isReplicated() &&
- (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
- AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
+ if (exchangeVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
+ if (!grp.isReplicated()) {
+ AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
- for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
- for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
- int p = e0.getKey();
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+ for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+ int p = e0.getKey();
- Set<UUID> diffIds = diffFromAffinity.get(p);
+ Set<UUID> diffIds = diffFromAffinity.get(p);
- if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
- !affAssignment.getIds(p).contains(e.getKey())) {
+ if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
+ !affAssignment.getIds(p).contains(e.getKey())) {
- if (diffIds == null)
- diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+ if (diffIds == null)
+ diffFromAffinity.put(p, diffIds = U.newHashSet(3));
- diffIds.add(e.getKey());
- }
- else {
- if (diffIds != null && diffIds.remove(e.getKey())) {
- if (diffIds.isEmpty())
- diffFromAffinity.remove(p);
+ diffIds.add(e.getKey());
+ }
+ else {
+ if (diffIds != null && diffIds.remove(e.getKey())) {
+ if (diffIds.isEmpty())
+ diffFromAffinity.remove(p);
+ }
}
}
}
@@ -1926,8 +1942,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
node2part.put(parts.nodeId(), parts);
// During exchange diff is calculated after all messages are received and affinity initialized.
- if (exchId == null && !grp.isReplicated()) {
- if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+ if (exchId == null && readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+ if (!grp.isReplicated()) {
AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
// Add new mappings.
@@ -1953,6 +1969,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
}
+ }
// Remove obsolete mappings.
if (cur != null) {
@@ -1968,8 +1985,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- diffFromAffinityVer = readyTopVer;
- }
+ diffFromAffinityVer = readyTopVer;
}
if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
@@ -2020,22 +2036,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (fut != null)
discoCache = fut.events().discoveryCache();
- if (!grp.isReplicated()) {
- boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
- fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
-
- if (rebuildDiff) {
- if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
- rebuildDiff(assignment);
- }
- else
- diffFromAffinityVer = readyTopVer;
-
- if (!updateRebalanceVer)
- updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
- }
+ if (!grp.isReplicated() && isRebuildDiffAssignmentRequired(fut)) {
+ if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+ rebuildDiff(assignment);
+ } else
+ diffFromAffinityVer = readyTopVer;
- if (updateRebalanceVer)
+ if (!grp.isReplicated() || updateRebalanceVer)
updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
// Own orphan moving partitions (having no suppliers).
@@ -2047,6 +2054,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ /**
+ * Checks is the rebuild required or not.
+ *
+ * @param fut Exchnage future.
+ * @return True if the rebuild difference assignment is required, false otherwise.
+ */
+ private boolean isRebuildDiffAssignmentRequired(@Nullable GridDhtPartitionsExchangeFuture fut) {
+ return fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
+ fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
+ }
+
/**
* Check if some local moving partitions have no suitable supplier and own them.
* This can happen if a partition has been created by affinity assignment on new node and no supplier exists.
@@ -3121,7 +3139,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param aff Affinity assignments.
*/
private void updateRebalanceVersion(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff) {
- if (!grp.isReplicated() && !affVer.equals(diffFromAffinityVer))
+ if (!affVer.equals(diffFromAffinityVer))
return;
if (!rebalancedTopVer.equals(readyTopVer)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4118e9bd854..19ec83a9bd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1007,7 +1007,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
GridCacheContext<K, V> cctx = cacheContext(ctx);
- IgniteCache<?, ?> cache = ctx.cache().jcache(cctx.name());
+ IgniteCache<?, ?> cache = ctx.cache().jcache(cacheName);
// Initial query entry or evicted entry. These events should be fired immediately.
if (internal || e.updateCounter() == -1L)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java
new file mode 100644
index 00000000000..12e5bb539f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/ReplicationCacheConsistencyOnUnstableTopologyTest.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2023 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/**
+ * The tets demonstrate the synchronization of shifting between topology versions on clusters during rebalance.
+ * Two nodes join toopology simultaneously, and the rebalancing topology on the entire cluster will be switched
+ * only when both rebalancings for the nodes finish.
+ */
+public class ReplicationCacheConsistencyOnUnstableTopologyTest extends GridCommonAbstractTest {
+ /**
+ * Cache mode.
+ */
+ private CacheMode cacheMode;
+
+ /**
+ * Cache write synchronization mode.
+ */
+ private CacheWriteSynchronizationMode writeSynchronizationMode;
+
+ /**
+ * True if the cache read operation can execute on backup replicas.
+ */
+ private boolean readFromBackup;
+
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(100L * 1024 * 1024)
+ .setPersistenceEnabled(true)))
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 3))
+ .setCacheMode(cacheMode)
+ .setBackups(2)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setWriteSynchronizationMode(writeSynchronizationMode)
+ .setReadFromBackup(readFromBackup));
+ }
+
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ @Test
+ public void testReplicatedFullSync() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, false);
+ }
+
+ @Test
+ public void testReplicatedFullSyncReadFromBackup() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_SYNC, true);
+ }
+
+ @Test
+ public void testReplicatedPrimarySync() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false);
+ }
+
+ @Test
+ public void testReplicatedPrimarySyncReadFromBackup() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true);
+ }
+
+ @Test
+ public void testReplicatedFullAsync() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, false);
+ }
+
+ @Test
+ public void testReplicatedFullAsyncReadFromBackup() throws Exception {
+ process(CacheMode.REPLICATED, CacheWriteSynchronizationMode.FULL_ASYNC, true);
+ }
+
+ @Test
+ public void testPartitionedFullSync() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, false);
+ }
+
+ @Test
+ public void testPartitionedFullSyncReadFromBackup() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_SYNC, true);
+ }
+
+ @Test
+ public void testPartitionedPrimarySync() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, false);
+ }
+
+ @Test
+ public void testPartitionedPrimarySyncReadFromBackup() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.PRIMARY_SYNC, true);
+ }
+
+ @Test
+ public void testPartitionedFullAsync() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, false);
+ }
+
+ @Test
+ public void testPartitionedFullAsyncReadFromBackup() throws Exception {
+ process(CacheMode.PARTITIONED, CacheWriteSynchronizationMode.FULL_ASYNC, true);
+ }
+
+ /**
+ * Executes a test scenario.
+ *
+ * @param cacheMode Cache mode.
+ * @param writeSynchronizationMode Cache write synchronization mode.
+ * @param readFromBackup True if the cache read operation can execute on backup replicas.
+ * @throws Exception If fail.
+ */
+ private void process(
+ CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSynchronizationMode,
+ boolean readFromBackup
+ ) throws Exception {
+ this.cacheMode = cacheMode;
+ this.writeSynchronizationMode = writeSynchronizationMode;
+ this.readFromBackup = readFromBackup;
+
+ IgniteEx ignite = startGrids(3);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ assertEquals(0, ignite.cache(DEFAULT_CACHE_NAME).size());
+
+ IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+ streamer.allowOverwrite(false);
+
+ for (int i = 0; i < 20; i++) {
+ streamer.addData(i, i);
+ }
+
+ streamer.flush();
+
+ ignite(1).close();
+
+ for (int i = 20; i < 40; i++) {
+ streamer.addData(i, i);
+ }
+
+ streamer.flush();
+
+ ignite(2).close();
+
+ for (int i = 40; i < 60; i++) {
+ streamer.addData(i, i);
+ }
+
+ streamer.close();
+
+ spi(ignite).blockMessages((node, message) -> {
+ if (message instanceof GridDhtPartitionSupplyMessage && testNodeName(2).equals(node.consistentId())) {
+ GridDhtPartitionSupplyMessage supplyMessage = ((GridDhtPartitionSupplyMessage)message);
+
+ return supplyMessage.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
+ }
+
+ return false;
+ });
+
+ startGrid(1);
+ startGrid(2);
+
+ AffinityTopologyVersion rebTopVer = getRebalancedTopVer(ignite);
+
+ assertEquals(rebTopVer, getRebalancedTopVer(ignite(1)));
+ assertEquals(rebTopVer, getRebalancedTopVer(ignite(2)));
+
+ HashSet<Integer> keysToUpdate = new HashSet<>(9);
+
+ // The keys were loaded on the nodes.
+ keysToUpdate.add(partitionKeys(0, 0, 20));
+ keysToUpdate.add(partitionKeys(1, 0, 20));
+ keysToUpdate.add(partitionKeys(2, 0, 20));
+
+ //The keys were loaded on topology without one node.
+ keysToUpdate.add(partitionKeys(0, 20, 40));
+ keysToUpdate.add(partitionKeys(1, 20, 40));
+ keysToUpdate.add(partitionKeys(2, 20, 40));
+
+ // The keys were loaded on topology without two nodes.
+ keysToUpdate.add(partitionKeys(0, 40, 60));
+ keysToUpdate.add(partitionKeys(1, 40, 60));
+ keysToUpdate.add(partitionKeys(2, 40, 60));
+
+ for (Integer key : keysToUpdate) {
+ info("Intention to invike [key: " + key +
+ " part: " + ignite.affinity(DEFAULT_CACHE_NAME).partition(key) +
+ " primary: " + ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key) + ']');
+ }
+
+ HashMap<Integer, EntryProcessor<Integer, Integer, Void>> invokes = new HashMap<>(keysToUpdate.size());
+
+ for (Integer key : keysToUpdate) {
+ invokes.put(key, new TestEntryProcessor(100));
+ }
+
+ checkTopology(3);
+
+ ignite.<Integer, Integer>cache(DEFAULT_CACHE_NAME).invokeAll(invokes);
+
+ spi(ignite).stopBlock();
+
+ awaitPartitionMapExchange();
+
+ rebTopVer = getRebalancedTopVer(ignite);
+
+ assertEquals(rebTopVer, getRebalancedTopVer(ignite(1)));
+ assertEquals(rebTopVer, getRebalancedTopVer(ignite(2)));
+
+ assertPartitionsSame(idleVerify(ignite, DEFAULT_CACHE_NAME));
+ }
+
+ /**
+ * Finds a partition key.
+ *
+ * @param part Partiton.
+ * @param from Left search bound.
+ * @param to Right search bound.
+ * @return A keyu.
+ */
+ protected Integer partitionKeys(int part, int from, int to) {
+ Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+
+ for (int k = from; k < to; k++) {
+ if (aff.partition(k) == part) {
+ return k;
+ }
+ }
+
+ throw new AssertionError("Key was not found [pat=" + part + ", from=" + from + ", to=" + to + ']');
+ }
+
+ /**
+ * Gets rebalance topology version for the Ignite instance.
+ *
+ * @param instance Ignite instance.
+ * @return Topologyy version.
+ */
+ private static AffinityTopologyVersion getRebalancedTopVer(IgniteEx instance) {
+ return ((GridDhtPartitionTopologyImpl)instance.context().cache()
+ .cache(DEFAULT_CACHE_NAME).context().topology()).getRebalancedTopVer();
+ }
+
+ /**
+ * The entry processor is intended to update a value when the previous one exists.
+ */
+ private static class TestEntryProcessor implements EntryProcessor<Integer, Integer, Void> {
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ private final Integer val;
+
+ public TestEntryProcessor(Integer val) {
+ this.val = val;
+ }
+
+ @Override public Void process(
+ MutableEntry<Integer, Integer> mutableEntry,
+ Object... objects
+ ) throws EntryProcessorException {
+ log.info("Updating entry [from=" + mutableEntry.getValue() + ", to=" + val + ']');
+
+ if (!mutableEntry.exists())
+ return null;
+
+ Integer entryVal = mutableEntry.getValue();
+
+ if (entryVal == null)
+ return null;
+
+ mutableEntry.setValue(val);
+
+ log.info("Updated entry [from=" + entryVal + ", to=" + val + ']');
+
+ return null;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 0ff129fee58..e5cf49f3497 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.cache.ReplicationCacheConsistencyOnUnstableTopologyTest;
import org.apache.ignite.internal.processors.cache.CachePutIfAbsentTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheGetCustomCollectionsSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
@@ -99,6 +100,7 @@ public class IgniteCacheTestSuite9 {
GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyVolatileRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxCrossCachePartitionConsistencyTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxPartitionCounterStateConsistencyNoopInvokeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ReplicationCacheConsistencyOnUnstableTopologyTest.class, ignoredTests);
return suite;
}