You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/05/15 10:18:46 UTC
[ignite] branch master updated: IGNITE-12617 PME-free switch should
wait for recovery only at affected nodes. (#7490)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bfd97f4 IGNITE-12617 PME-free switch should wait for recovery only at affected nodes. (#7490)
bfd97f4 is described below
commit bfd97f4d9d24da0ba56dc2384734f5c36be6d754
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri May 15 13:18:37 2020 +0300
IGNITE-12617 PME-free switch should wait for recovery only at affected nodes. (#7490)
---
.../cache/CacheAffinitySharedManager.java | 31 ++
.../processors/cache/GridCacheSharedContext.java | 9 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 116 ++++--
.../dht/preloader/latch/ExchangeLatchManager.java | 33 +-
.../cache/transactions/IgniteTxManager.java | 48 ++-
...GridExchangeFreeCellularSwitchAbstractTest.java | 284 +++++++++++++
...ngeFreeCellularSwitchComplexOperationsTest.java | 354 ++++++++++++++++
...ridExchangeFreeCellularSwitchIsolationTest.java | 464 +++++++++++++++++++++
.../distributed/GridExchangeFreeSwitchTest.java | 28 +-
.../testsuites/IgniteCacheMvccTestSuite5.java | 6 +
.../ignite/testsuites/IgniteCacheTestSuite5.java | 5 +
11 files changed, 1314 insertions(+), 64 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 385e74e..dcd18d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -82,6 +82,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -2598,6 +2599,36 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @return Primary nodes for local backups.
+ */
+ public Set<ClusterNode> idealPrimaryNodesForLocalBackups() {
+ Set<ClusterNode> res = new GridConcurrentHashSet<>();
+
+ ClusterNode loc = cctx.localNode();
+
+ forAllCacheGroups(new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) {
+ CacheGroupDescriptor desc = cachesRegistry.group(aff.groupId());
+
+ if (desc.config().getCacheMode() == PARTITIONED) {
+ List<List<ClusterNode>> assignment = aff.idealAssignmentRaw();
+
+ HashSet<ClusterNode> primaries = new HashSet<>();
+
+ for (List<ClusterNode> nodes : assignment) {
+ if (nodes.indexOf(loc) > 0)
+ primaries.add(nodes.get(0));
+ }
+
+ res.addAll(primaries);
+ }
+ }
+ });
+
+ return res;
+ }
+
+ /**
*
*/
abstract class CacheGroupHolder {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index ab496be..c1981c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
@@ -960,7 +961,7 @@ public class GridCacheSharedContext<K, V> {
f.add(mvcc().finishAtomicUpdates(topVer));
f.add(mvcc().finishDataStreamerUpdates(topVer));
- IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer, null);
+ IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer);
// To properly track progress of finishing local tx updates we explicitly add this future to compound set.
f.add(finishLocalTxsFuture);
f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer));
@@ -979,10 +980,12 @@ public class GridCacheSharedContext<K, V> {
*
* @param topVer Topology version.
* @param node Failed node.
+ * @param filter Recovery filter.
* @return {@code true} if waiting was successful.
*/
- public IgniteInternalFuture<?> partitionRecoveryFuture(AffinityTopologyVersion topVer, ClusterNode node) {
- return tm().finishLocalTxs(topVer, node);
+ public IgniteInternalFuture<?> partitionRecoveryFuture(AffinityTopologyVersion topVer, ClusterNode node,
+ IgnitePredicate<IgniteInternalTx> filter) {
+ return tm().recoverLocalTxs(topVer, node, filter);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ce5d522..0751cac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -100,6 +100,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -122,6 +124,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
@@ -171,7 +174,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
/** */
- private static final String DISTRIBUTED_LATCH_ID = "exchange";
+ private static final String EXCHANGE_LATCH_ID = "exchange";
+
+ /** */
+ private static final String EXCHANGE_FREE_LATCH_ID = "exchange-free";
/** */
@GridToStringExclude
@@ -368,8 +374,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Partitions scheduled for clearing before rebalance for this topology version. */
private Map<Integer, Set<Integer>> clearingPartitions;
- /** This future finished with 'cluster is fully rebalanced' state. */
- private volatile boolean rebalanced;
+ /** Specified only in case of 'cluster is fully rebalanced' state achieved. */
+ private volatile RebalancedInfo rebalancedInfo;
/**
* @param cctx Cache context.
@@ -1400,7 +1406,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return ExchangeType.CLIENT;
else {
if (wasRebalanced())
- markRebalanced();
+ keepRebalanced();
return ExchangeType.NONE;
}
@@ -1437,9 +1443,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert exchCtx.exchangeFreeSwitch();
- assert wasRebalanced() : this;
-
- markRebalanced(); // Still rebalanced.
+ keepRebalanced(); // Still rebalanced.
onLeft();
@@ -1538,8 +1542,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean skipWaitOnLocalJoin = localJoinExchange()
&& cctx.exchange().latch().canSkipJoiningNodes(initialVersion());
- if (context().exchangeFreeSwitch() && isBaselineNodeFailed())
- waitPartitionRelease(true, false);
+ if (context().exchangeFreeSwitch() && isBaselineNodeFailed()) {
+ // Currently MVCC does not support operations on partially switched cluster.
+ if (cctx.kernalContext().coordinators().mvccEnabled())
+ waitPartitionRelease(EXCHANGE_FREE_LATCH_ID, true, false, null);
+ else {
+ boolean partitionedRecoveryRequired = rebalancedInfo.primaryNodes.contains(firstDiscoEvt.eventNode());
+
+ IgnitePredicate<IgniteInternalTx> replicatedOnly = tx -> {
+ Collection<IgniteTxEntry> entries = tx.writeEntries();
+ for (IgniteTxEntry entry : entries)
+ if (cctx.cacheContext(entry.cacheId()).isReplicated())
+ return true;
+
+ // Checks only affected nodes contain replicated-free txs with failed primaries.
+ assert partitionedRecoveryRequired;
+
+ return false;
+ };
+
+ // Assuming that replicated transactions are absent, non-affected nodes will wait only this short sync.
+ waitPartitionRelease(EXCHANGE_FREE_LATCH_ID + "-replicated", true, false, replicatedOnly);
+
+ String partitionedLatchId = EXCHANGE_FREE_LATCH_ID + "-partitioned";
+
+ if (partitionedRecoveryRequired)
+ // This node contain backup partitions for failed partitioned caches primaries. Waiting for recovery.
+ waitPartitionRelease(partitionedLatchId, true, false, null);
+ else {
+ // This node contain no backup partitions for failed partitioned caches primaries. Recovery is not needed.
+ Latch releaseLatch = cctx.exchange().latch().getOrCreate(partitionedLatchId, initialVersion());
+
+ releaseLatch.countDown(); // Await-free confirmation.
+ }
+ }
+ }
else if (!skipWaitOnLocalJoin) { // Skip partition release if node has locally joined (it doesn't have any updates to be finished).
boolean distributed = true;
@@ -1548,11 +1585,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
distributed = false;
// On first phase we wait for finishing all local tx updates, atomic updates and lock releases on all nodes.
- waitPartitionRelease(distributed, true);
+ waitPartitionRelease(EXCHANGE_LATCH_ID, distributed, true, null);
// Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
if (distributed)
- waitPartitionRelease(false, false);
+ waitPartitionRelease(EXCHANGE_LATCH_ID, false, false, null);
}
else {
if (log.isInfoEnabled())
@@ -1721,18 +1758,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* The main purpose of this method is to wait for all ongoing updates (transactional and atomic), initiated on
* the previous topology version, to finish to prevent inconsistencies during rebalancing and to prevent two
* different simultaneous owners of the same lock.
- * For the exact list of the objects being awaited for see
- * {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
* Also, this method can be used to wait for tx recovery only in case of PME-free switch.
- * See {@link GridCacheSharedContext#partitionRecoveryFuture(AffinityTopologyVersion, ClusterNode)} for additional details.
*
+ * @param latchId Distributed latch Id.
* @param distributed If {@code true} then node should wait for partition release completion on all other nodes.
* @param doRollback If {@code true} tries to rollback transactions which lock partitions. Avoids unnecessary calls
* of {@link org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager#rollbackOnTopologyChange}
+ * @param filter Recovery filter.
*
* @throws IgniteCheckedException If failed.
*/
- private void waitPartitionRelease(boolean distributed, boolean doRollback) throws IgniteCheckedException {
+ private void waitPartitionRelease(
+ String latchId,
+ boolean distributed,
+ boolean doRollback,
+ IgnitePredicate<IgniteInternalTx> filter) throws IgniteCheckedException {
+ assert context().exchangeFreeSwitch() || filter == null;
+
Latch releaseLatch = null;
IgniteInternalFuture<?> partReleaseFut;
@@ -1742,10 +1784,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
// Wait for other nodes only on first phase.
if (distributed)
- releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
+ releaseLatch = cctx.exchange().latch().getOrCreate(latchId, initialVersion());
partReleaseFut = context().exchangeFreeSwitch() && isBaselineNodeFailed() ?
- cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode()) :
+ cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode(), filter) :
cctx.partitionReleaseFuture(initialVersion());
// Assign to class variable so it will be included into toString() method.
@@ -1869,7 +1911,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- timeBag.finishGlobalStage("Wait partitions release");
+ timeBag.finishGlobalStage("Wait partitions release [latch=" + latchId + "]");
if (releaseLatch == null) {
assert !distributed : "Partitions release latch must be initialized in distributed mode.";
@@ -1923,7 +1965,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage());
}
- timeBag.finishGlobalStage("Wait partitions release latch");
+ timeBag.finishGlobalStage("Wait partitions release latch [latch=" + latchId + "]");
}
/**
@@ -2422,7 +2464,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null) {
updateDurationHistogram(System.currentTimeMillis() - initTime);
- cctx.exchange().clusterRebalancedMetric().value(rebalanced);
+ cctx.exchange().clusterRebalancedMetric().value(rebalanced());
}
if (log.isInfoEnabled()) {
@@ -2449,7 +2491,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
initFut.onDone(err == null);
- cctx.exchange().latch().dropLatch(DISTRIBUTED_LATCH_ID, initialVersion());
+ cctx.exchange().latch().dropClientLatches(initialVersion());
if (exchCtx != null && exchCtx.events().hasServerLeft()) {
ExchangeDiscoveryEvents evts = exchCtx.events();
@@ -5097,7 +5139,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return {@code True} if cluster fully rebalanced.
*/
public boolean rebalanced() {
- return rebalanced;
+ return rebalancedInfo != null;
}
/**
@@ -5114,10 +5156,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* Sets cluster fully rebalanced flag.
*/
- public void markRebalanced() {
- assert !rebalanced;
+ private void markRebalanced() {
+ assert !rebalanced();
+
+ rebalancedInfo = new RebalancedInfo(cctx.affinity().idealPrimaryNodesForLocalBackups());
+ }
+
+ /**
+ * Keeps cluster fully rebalanced flag.
+ */
+ private void keepRebalanced() {
+ assert !rebalanced() && wasRebalanced();
- rebalanced = true;
+ rebalancedInfo = sharedContext().exchange().lastFinishedFuture().rebalancedInfo;
}
/**
@@ -5401,4 +5452,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** This exchange was merged with another one. */
MERGED
}
+
+ /**
+ *
+ */
+ private static class RebalancedInfo {
+ /** Primary nodes for local backups for all registered partitioned caches. */
+ private Set<ClusterNode> primaryNodes;
+
+ /**
+ * @param primaryNodes Primary nodes for local backups.
+ */
+ public RebalancedInfo(Set<ClusterNode> primaryNodes) {
+ this.primaryNodes = primaryNodes;
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 006d0c0..92af6ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -241,29 +241,28 @@ public class ExchangeLatchManager {
}
/**
- * Drops the latch created by {@link #getOrCreate(String, AffinityTopologyVersion)}. The corresponding
- * latch should be created before this method is invoked.
+ * Drops client latches created by {@link #getOrCreate(String, AffinityTopologyVersion)}. The corresponding
+ * latches should be created before this method is invoked.
* <p>
- * This method must be called when it is guaranteed that all nodes have processed the latch messages. In
+ * This method must be called when it is guaranteed that all nodes have processed the latches messages. In
* the context of partitions map exchange this can be done when exchange future is completed.
*
- * @param id Latch id.
* @param topVer Latch topology version.
*/
- public void dropLatch(String id, AffinityTopologyVersion topVer) {
+ public void dropClientLatches(AffinityTopologyVersion topVer) {
lock.lock();
try {
- final CompletableLatchUid latchUid = new CompletableLatchUid(id, topVer);
-
- ClientLatch clientLatch = clientLatches.remove(latchUid);
- ServerLatch srvLatch = serverLatches.remove(latchUid);
+ for (CompletableLatchUid latchUid : clientLatches.keySet()) {
+ if (latchUid.topVer.equals(topVer)) {
+ ClientLatch latch = clientLatches.remove(latchUid);
- if (log.isDebugEnabled())
- log.debug("Dropping latch [id=" + id + ", topVer=" + topVer + ", srvLatch=" + srvLatch +
- ", clientLatch=" + clientLatch + ']');
+ if (log.isDebugEnabled())
+ log.debug("Dropping client latch [id=" + latchUid + ", latch=" + latch + ']');
- pendingAcks.remove(latchUid);
+ pendingAcks.remove(latchUid);
+ }
+ }
}
finally {
lock.unlock();
@@ -634,8 +633,14 @@ public class ExchangeLatchManager {
if (log.isDebugEnabled())
log.debug("Count down [latch=" + latchId() + ", remaining=" + remaining + "]");
- if (remaining == 0)
+ if (remaining == 0) {
complete();
+
+ serverLatches.remove(id);
+
+ if (log.isDebugEnabled())
+ log.debug("Dropping server latch [id=" + id + ", latch=" + this + ']');
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f839ab1..36f1a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -105,6 +105,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.spi.systemview.view.TransactionView;
@@ -825,10 +826,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* </ul>
*
* @param topVer Topology version.
- * @param node Cluster node.
* @return Future that will be completed when all ongoing transactions are finished.
*/
- public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer, ClusterNode node) {
+ public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer) {
GridCompoundFuture<IgniteInternalTx, Boolean> res =
new CacheObjectsReleaseFuture<>(
"LocalTx",
@@ -844,15 +844,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
});
for (IgniteInternalTx tx : activeTransactions()) {
- if (node != null) {
- if (tx.originatingNodeId().equals(node.id())) {
- assert needWaitTransaction(tx, topVer);
+ if (needWaitTransaction(tx, topVer))
+ res.add(tx.finishFuture());
+ }
+
+ res.markInitialized();
+ return res;
+ }
+
+ /**
+ * Creates a future that will wait for all transactions with failed primary recovery.
+ *
+ * @param topVer Topology version.
+ * @param node Failed node.
+ * @param filter Recovery filter.
+ * @return Future that will be completed when all affected transactions are recovered.
+ */
+ public IgniteInternalFuture<Boolean> recoverLocalTxs(AffinityTopologyVersion topVer, ClusterNode node,
+ IgnitePredicate<IgniteInternalTx> filter) {
+
+ GridCompoundFuture<IgniteInternalTx, Boolean> res =
+ new CacheObjectsReleaseFuture<>(
+ "TxRecovery",
+ topVer,
+ new IgniteReducer<IgniteInternalTx, Boolean>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public Boolean reduce() {
+ return true;
+ }
+ });
+
+ for (IgniteInternalTx tx : activeTransactions()) {
+ if (tx.dht() && !tx.local() && tx.originatingNodeId().equals(node.id())) {
+ assert needWaitTransaction(tx, topVer);
+
+ if (filter == null || filter.apply(tx))
res.add(tx.finishFuture());
- }
}
- else if (needWaitTransaction(tx, topVer))
- res.add(tx.finishFuture());
}
res.markInitialized();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
new file mode 100644
index 0000000..8795f64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCommonAbstractTest {
+ /** Partitioned cache name. */
+ protected static final String PART_CACHE_NAME = "partitioned";
+
+ /** Replicated cache name. */
+ protected static final String REPL_CACHE_NAME = "replicated";
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cfg.setClusterStateOnStart(ClusterState.INACTIVE);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+
+ DataRegionConfiguration drCfg = new DataRegionConfiguration();
+
+ drCfg.setPersistenceEnabled(true);
+
+ dsCfg.setDefaultDataRegionConfiguration(drCfg);
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private CacheConfiguration<?, ?>[] cacheConfiguration() {
+ CacheConfiguration<?, ?> partitionedCcfg = new CacheConfiguration<>();
+
+ partitionedCcfg.setName(PART_CACHE_NAME);
+ partitionedCcfg.setWriteSynchronizationMode(FULL_SYNC);
+ partitionedCcfg.setBackups(2);
+ partitionedCcfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ partitionedCcfg.setAffinity(new Map6PartitionsTo6NodesTo2CellsAffinityFunction());
+
+ CacheConfiguration<?, ?> replicatedCcfg = new CacheConfiguration<>();
+
+ replicatedCcfg.setName(REPL_CACHE_NAME);
+ replicatedCcfg.setWriteSynchronizationMode(FULL_SYNC);
+ replicatedCcfg.setCacheMode(CacheMode.REPLICATED);
+ replicatedCcfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ return new CacheConfiguration[] {partitionedCcfg, replicatedCcfg};
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ *
+ */
+ protected void awaitForSwitchOnNodeLeft(Ignite failed) throws IgniteInterruptedCheckedException {
+ assertTrue(GridTestUtils.waitForCondition(
+ () -> {
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite == failed)
+ continue;
+
+ GridDhtPartitionsExchangeFuture fut =
+ ((IgniteEx)ignite).context().cache().context().exchange().lastTopologyFuture();
+
+ if (!fut.exchangeId().isLeft())
+ return false;
+ }
+
+ return true;
+ }, 5000));
+ }
+
+ /**
+ *
+ */
+ protected void blockRecoveryMessages() {
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg.getClass().equals(GridCacheTxRecoveryRequest.class);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ protected void checkTransactionsCount(
+ Ignite orig,
+ int origCnt,
+ Ignite primary,
+ int primaryCnt,
+ List<Ignite> backupNodes,
+ int backupCnt,
+ List<Ignite> nearNodes,
+ int nearCnt,
+ Set<GridCacheVersion> vers) {
+ Function<Ignite, Collection<GridCacheVersion>> txs = ignite -> {
+ Collection<IgniteInternalTx> active = ((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
+
+ // Transactions originally started at backups will be presented as single element.
+ return active.stream()
+ .map(IgniteInternalTx::nearXidVersion)
+ .filter(ver -> vers == null || vers.contains(ver))
+ .collect(Collectors.toSet());
+ };
+
+ if (orig != null)
+ assertEquals(origCnt, txs.apply(orig).size());
+
+ if (primary != null && primary != orig)
+ assertEquals(primaryCnt, txs.apply(primary).size());
+
+ for (Ignite backup : backupNodes)
+ if (backup != orig)
+ assertEquals(backupCnt, txs.apply(backup).size());
+
+ for (Ignite near : nearNodes)
+ if (near != orig)
+ assertEquals(nearCnt, txs.apply(near).size());
+ }
+
+ /**
+ *
+ */
+ protected static class Map6PartitionsTo6NodesTo2CellsAffinityFunction extends RendezvousAffinityFunction {
+ /**
+ * Default constructor.
+ */
+ public Map6PartitionsTo6NodesTo2CellsAffinityFunction() {
+ super(false, 6);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<List<ClusterNode>> res = new ArrayList<>(6);
+
+ int backups = affCtx.backups();
+
+ assert backups == 2;
+
+ if (affCtx.currentTopologySnapshot().size() == 6) {
+ List<ClusterNode> p0 = new ArrayList<>();
+ List<ClusterNode> p1 = new ArrayList<>();
+ List<ClusterNode> p2 = new ArrayList<>();
+ List<ClusterNode> p3 = new ArrayList<>();
+ List<ClusterNode> p4 = new ArrayList<>();
+ List<ClusterNode> p5 = new ArrayList<>();
+
+ // Cell 1.
+ p0.add(affCtx.currentTopologySnapshot().get(0));
+ p0.add(affCtx.currentTopologySnapshot().get(1));
+ p0.add(affCtx.currentTopologySnapshot().get(2));
+
+ p1.add(affCtx.currentTopologySnapshot().get(2));
+ p1.add(affCtx.currentTopologySnapshot().get(0));
+ p1.add(affCtx.currentTopologySnapshot().get(1));
+
+ p2.add(affCtx.currentTopologySnapshot().get(1));
+ p2.add(affCtx.currentTopologySnapshot().get(2));
+ p2.add(affCtx.currentTopologySnapshot().get(0));
+
+ // Cell 2.
+ p3.add(affCtx.currentTopologySnapshot().get(3));
+ p3.add(affCtx.currentTopologySnapshot().get(4));
+ p3.add(affCtx.currentTopologySnapshot().get(5));
+
+ p4.add(affCtx.currentTopologySnapshot().get(5));
+ p4.add(affCtx.currentTopologySnapshot().get(3));
+ p4.add(affCtx.currentTopologySnapshot().get(4));
+
+ p5.add(affCtx.currentTopologySnapshot().get(4));
+ p5.add(affCtx.currentTopologySnapshot().get(5));
+ p5.add(affCtx.currentTopologySnapshot().get(3));
+
+ res.add(p0);
+ res.add(p1);
+ res.add(p2);
+ res.add(p3);
+ res.add(p4);
+ res.add(p5);
+ }
+
+ return res;
+ }
+ }
+
+ /**
+ * Specifies node starts the transaction (originating node).
+ */
+ protected enum TransactionCoordinatorNode {
+ /** Primary. */
+ PRIMARY,
+
+ /** Backup. */
+ BACKUP,
+
+ /** Near. */
+ NEAR,
+
+ /** Client. */
+ CLIENT
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java
new file mode 100644
index 0000000..2bbcc84
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExchangeFreeCellularSwitchAbstractTest {
+ /** Concurrency. */
+ @Parameterized.Parameter(0)
+ public TransactionConcurrency concurrency;
+
+ /** Isolation. */
+ @Parameterized.Parameter(1)
+ public TransactionIsolation isolation;
+
+ /** Start from. */
+ @Parameterized.Parameter(2)
+ public TransactionCoordinatorNode startFrom;
+
+ /**
+ *
+ */
+ @Parameterized.Parameters(name = "Isolation = {0}, Concurrency = {1}, Started from = {2}")
+ public static Collection<Object[]> runConfig() {
+ ArrayList<Object[]> params = new ArrayList<>();
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values())
+ for (TransactionIsolation isolation : TransactionIsolation.values())
+ for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
+ params.add(new Object[] {concurrency, isolation, from});
+
+ return params;
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testComplexOperationsRecoveryOnCellularSwitch() throws Exception {
+ int nodes = 6;
+
+ startGridsMultiThreaded(nodes);
+
+ blockRecoveryMessages();
+
+ Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
+
+ Integer partKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
+
+ List<Ignite> backupNodes = backupNodes(partKey, PART_CACHE_NAME);
+ List<Ignite> nearNodes = new ArrayList<>(G.allGrids());
+
+ nearNodes.remove(failed);
+ nearNodes.removeAll(backupNodes);
+
+ assertTrue(Collections.disjoint(backupNodes, nearNodes));
+ assertEquals(nodes / 2 - 1, backupNodes.size()); // Cell 1.
+ assertEquals(nodes / 2, nearNodes.size()); // Cell 2.
+
+ Ignite orig;
+
+ switch (startFrom) {
+ case PRIMARY:
+ orig = failed;
+
+ break;
+
+ case BACKUP:
+ orig = backupNodes.get(0);
+
+ break;
+
+ case NEAR:
+ orig = nearNodes.get(0);
+
+ break;
+
+ case CLIENT:
+ orig = startClientGrid();
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ int recFutsCnt = 7;
+
+ CountDownLatch prepLatch = new CountDownLatch(recFutsCnt);
+ CountDownLatch commitLatch = new CountDownLatch(1);
+
+ Set<Integer> partSet = new GridConcurrentHashSet<>();
+ Set<Integer> replSet = new GridConcurrentHashSet<>();
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ AtomicInteger cnt = new AtomicInteger();
+
+ BiFunction<Ignite, String, Integer> nextPrimaryKey = (ignite, cacheName) -> {
+ int idx = cnt.getAndIncrement();
+
+ return primaryKeys(ignite.getOrCreateCache(cacheName), idx + 1).get(idx);
+ };
+
+ BiConsumer<String, Set<Integer>> singlePutEverywhere = (cacheName, globSet) -> {
+ try {
+ Transaction tx = orig.transactions().txStart(concurrency, isolation);
+
+ Set<Integer> set = new HashSet<>();
+
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite.configuration().isClientMode())
+ continue;
+
+ set.add(nextPrimaryKey.apply(ignite, cacheName));
+ }
+
+ globSet.addAll(set);
+
+ IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(cacheName);
+
+ for (Integer key : set)
+ cache.put(key, key);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
+
+ commitLatch.await();
+
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ futs.add(multithreadedAsync(() -> singlePutEverywhere.accept(PART_CACHE_NAME, partSet), 1));
+ futs.add(multithreadedAsync(() -> singlePutEverywhere.accept(REPL_CACHE_NAME, replSet), 1));
+
+ Consumer<Integer> putEverywhereToBoth = (putPerTx) -> {
+ try {
+ Transaction tx = orig.transactions().txStart(concurrency, isolation);
+
+ Set<Integer> pSet = new HashSet<>();
+ Set<Integer> rSet = new HashSet<>();
+
+ for (int i = 0; i < putPerTx; i++)
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite.configuration().isClientMode())
+ continue;
+
+ pSet.add(nextPrimaryKey.apply(ignite, PART_CACHE_NAME));
+ rSet.add(nextPrimaryKey.apply(ignite, REPL_CACHE_NAME));
+ }
+
+ partSet.addAll(pSet);
+ replSet.addAll(rSet);
+
+ IgniteCache<Integer, Integer> pCache = orig.getOrCreateCache(PART_CACHE_NAME);
+ IgniteCache<Integer, Integer> rCache = orig.getOrCreateCache(REPL_CACHE_NAME);
+
+ for (Integer key : pSet)
+ pCache.put(key, key);
+
+ for (Integer key : rSet)
+ rCache.put(key, key);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
+
+ commitLatch.await();
+
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(1), 1));
+ futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(2), 1));
+ futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(10), 1));
+
+ Consumer<Boolean> singleTxPerCell = (partAtCell1) -> {
+ try {
+ Transaction tx = orig.transactions().txStart(concurrency, isolation);
+
+ Integer pKey = partAtCell1 ? nextPrimaryKey.apply(failed, PART_CACHE_NAME) :
+ nextPrimaryKey.apply(nearNodes.get(0), PART_CACHE_NAME);
+
+ Integer rKey = partAtCell1 ? nextPrimaryKey.apply(nearNodes.get(0), REPL_CACHE_NAME) :
+ nextPrimaryKey.apply(failed, REPL_CACHE_NAME);
+
+ IgniteCache<Integer, Integer> pCache = orig.getOrCreateCache(PART_CACHE_NAME);
+ IgniteCache<Integer, Integer> rCache = orig.getOrCreateCache(REPL_CACHE_NAME);
+
+ pCache.put(pKey, pKey);
+ rCache.put(rKey, rKey);
+
+ partSet.add(pKey);
+ replSet.add((rKey));
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
+
+ commitLatch.await();
+
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ futs.add(multithreadedAsync(() -> singleTxPerCell.accept(true), 1));
+ futs.add(multithreadedAsync(() -> singleTxPerCell.accept(false), 1));
+
+ prepLatch.await();
+
+ assertEquals(futs.size(), recFutsCnt);
+
+ failed.close(); // Stopping node.
+
+ awaitForSwitchOnNodeLeft(failed);
+
+ Consumer<Ignite> partTxRun = (ignite) -> {
+ try {
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(PART_CACHE_NAME);
+
+ try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+ Integer key = nextPrimaryKey.apply(ignite, PART_CACHE_NAME);
+
+ partSet.add(key);
+
+ cache.put(key, key);
+
+ tx.commit();
+ }
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ Consumer<Ignite> replTxRun = (ignite) -> {
+ try {
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(REPL_CACHE_NAME);
+
+ try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
+ Integer key = nextPrimaryKey.apply(ignite, REPL_CACHE_NAME);
+
+ replSet.add(key);
+
+ cache.put(key, key);
+
+ tx.commit();
+ }
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ for (Ignite backup : backupNodes) {
+ futs.add(multithreadedAsync(() -> partTxRun.accept(backup), 1));
+ futs.add(multithreadedAsync(() -> replTxRun.accept(backup), 1));
+ }
+
+ for (Ignite near : nearNodes) {
+ futs.add(multithreadedAsync(() -> partTxRun.accept(near), 1));
+ futs.add(multithreadedAsync(() -> replTxRun.accept(near), 1));
+ }
+
+ // Allowing recovery.
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.stopBlock(true, (t) -> true);
+ }
+
+ commitLatch.countDown();
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ for (Ignite node : G.allGrids()) {
+ IgniteCache<Integer, Integer> partCache = node.getOrCreateCache(PART_CACHE_NAME);
+ IgniteCache<Integer, Integer> replCache = node.getOrCreateCache(REPL_CACHE_NAME);
+
+ for (Integer key : partSet)
+ assertEquals(key, partCache.get(key));
+
+ for (Integer key : replSet)
+ assertEquals(key, replCache.get(key));
+ }
+
+ // Final check that any transactions are absent.
+ checkTransactionsCount(
+ null, 0,
+ null, 0,
+ backupNodes, 0,
+ nearNodes, 0,
+ null);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
new file mode 100644
index 0000000..e45399f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFreeCellularSwitchAbstractTest {
+ /** Start from. */
+ @Parameterized.Parameter(0)
+ public TransactionCoordinatorNode startFrom;
+
+ /**
+ *
+ */
+ @Parameterized.Parameters(name = "Started from = {0}")
+ public static Collection<Object[]> runConfig() {
+ ArrayList<Object[]> params = new ArrayList<>();
+
+ for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
+ params.add(new Object[] {from});
+
+ return params;
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
+ int nodes = 6;
+
+ startGridsMultiThreaded(nodes);
+
+ blockRecoveryMessages();
+
+ Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
+
+ Integer partKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
+ Integer replKey = primaryKey(failed.getOrCreateCache(REPL_CACHE_NAME));
+
+ List<Ignite> backupNodes = backupNodes(partKey, PART_CACHE_NAME);
+ List<Ignite> nearNodes = new ArrayList<>(G.allGrids());
+
+ nearNodes.remove(failed);
+ nearNodes.removeAll(backupNodes);
+
+ assertTrue(Collections.disjoint(backupNodes, nearNodes));
+ assertEquals(nodes / 2 - 1, backupNodes.size()); // Cell 1.
+ assertEquals(nodes / 2, nearNodes.size()); // Cell 2.
+
+ Ignite orig;
+
+ switch (startFrom) {
+ case PRIMARY:
+ orig = failed;
+
+ break;
+
+ case BACKUP:
+ orig = backupNodes.get(0);
+
+ break;
+
+ case NEAR:
+ orig = nearNodes.get(0);
+
+ break;
+
+ case CLIENT:
+ orig = startClientGrid();
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ Set<GridCacheVersion> vers = new GridConcurrentHashSet<>();
+
+ CountDownLatch partPreparedLatch = new CountDownLatch(1);
+ CountDownLatch partCommitLatch = new CountDownLatch(1);
+ CountDownLatch replPreparedLatch = new CountDownLatch(1);
+ CountDownLatch replCommitLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> partFut = multithreadedAsync(() -> {
+ try {
+ checkTransactionsCount(
+ orig, 0,
+ failed, 0,
+ backupNodes, 0,
+ nearNodes, 0,
+ vers);
+
+ Transaction tx = orig.transactions().txStart();
+
+ vers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
+
+ checkTransactionsCount(
+ orig, 1,
+ failed, 0,
+ backupNodes, 0,
+ nearNodes, 0,
+ vers);
+
+ orig.getOrCreateCache(PART_CACHE_NAME).put(partKey, 42);
+
+ checkTransactionsCount(
+ orig, 1,
+ failed, 1,
+ backupNodes, 0,
+ nearNodes, 0,
+ vers);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ checkTransactionsCount(
+ orig, 1,
+ failed, 1,
+ backupNodes, 1,
+ nearNodes, 0,
+ vers);
+
+ partPreparedLatch.countDown();
+
+ partCommitLatch.await();
+
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
+
+ AtomicReference<GridCacheVersion> replTxVer = new AtomicReference<>();
+
+ IgniteInternalFuture<?> replFut = multithreadedAsync(() -> {
+ try {
+ partPreparedLatch.await(); // Waiting for partitioned cache tx preparation.
+
+ checkTransactionsCount(
+ orig, 1,
+ failed, 1,
+ backupNodes, 1,
+ nearNodes, 0,
+ vers);
+
+ Transaction tx = orig.transactions().txStart();
+
+ replTxVer.set(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
+
+ vers.add(replTxVer.get());
+
+ checkTransactionsCount(
+ orig, 2,
+ failed, 1,
+ backupNodes, 1,
+ nearNodes, 0,
+ vers);
+
+ orig.getOrCreateCache(REPL_CACHE_NAME).put(replKey, 43);
+
+ checkTransactionsCount(
+ orig, 2,
+ failed, 2,
+ backupNodes, 1,
+ nearNodes, 0,
+ vers);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ checkTransactionsCount(
+ orig, 2,
+ failed, 2,
+ backupNodes, 2,
+ nearNodes, 1,
+ vers);
+
+ replPreparedLatch.countDown();
+
+ replCommitLatch.await();
+
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
+
+ partPreparedLatch.await();
+ replPreparedLatch.await();
+
+ checkTransactionsCount(
+ orig, 2,
+ failed, 2,
+ backupNodes, 2,
+ nearNodes, 1,
+ vers);
+
+ failed.close(); // Stopping node.
+
+ awaitForSwitchOnNodeLeft(failed);
+
+ checkTransactionsCount(
+ orig != failed ? orig : null /*stopped*/, 2 /* replicated + partitioned */,
+ null /*stopped*/, 0,
+ backupNodes, 2 /* replicated + partitioned */,
+ nearNodes, 1 /* replicated */,
+ vers);
+
+ BiConsumer<T2<Ignite, String>, T3<CountDownLatch, CountDownLatch, CountDownLatch>> txRun = // Counts tx's creations and preparations.
+ (T2<Ignite, String> pair, T3</*create*/CountDownLatch, /*put*/CountDownLatch, /*commit*/CountDownLatch> latches) -> {
+ try {
+ Ignite ignite = pair.get1();
+ String cacheName = pair.get2();
+
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ latches.get1().countDown(); // Create.
+
+ cache.put(primaryKeys(cache, 100).get(99), 2);
+
+ latches.get2().countDown(); // Put.
+
+ tx.commit();
+
+ latches.get3().countDown(); // Commit.
+ }
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ };
+
+ CountDownLatch replBackupCreateLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch replBackupPutLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch replBackupCommitLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch replNearCreateLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch replNearPutLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch replNearCommitLatch = new CountDownLatch(nearNodes.size());
+
+ CountDownLatch partBackupCreateLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch partBackupPutLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch partBackupCommitLatch = new CountDownLatch(backupNodes.size());
+ CountDownLatch partNearCreateLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch partNearPutLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch partNearCommitLatch = new CountDownLatch(nearNodes.size());
+
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (Ignite backup : backupNodes) {
+ futs.add(multithreadedAsync(() ->
+ txRun.accept(new T2<>(backup, REPL_CACHE_NAME),
+ new T3<>(replBackupCreateLatch, replBackupPutLatch, replBackupCommitLatch)), 1));
+ futs.add(multithreadedAsync(() ->
+ txRun.accept(new T2<>(backup, PART_CACHE_NAME),
+ new T3<>(partBackupCreateLatch, partBackupPutLatch, partBackupCommitLatch)), 1));
+ }
+
+ for (Ignite near : nearNodes) {
+ futs.add(multithreadedAsync(() ->
+ txRun.accept(new T2<>(near, REPL_CACHE_NAME),
+ new T3<>(replNearCreateLatch, replNearPutLatch, replNearCommitLatch)), 1));
+ futs.add(multithreadedAsync(() ->
+ txRun.accept(new T2<>(near, PART_CACHE_NAME),
+ new T3<>(partNearCreateLatch, partNearPutLatch, partNearCommitLatch)), 1));
+ }
+
+ // Switch in progress cluster-wide.
+ checkUpcomingTransactionsState(
+ replBackupCreateLatch, 0, // Started.
+ replBackupPutLatch, backupNodes.size(),
+ replBackupCommitLatch, backupNodes.size(),
+ replNearCreateLatch, 0, // Started.
+ replNearPutLatch, nearNodes.size(),
+ replNearCommitLatch, nearNodes.size());
+
+ checkUpcomingTransactionsState(
+ partBackupCreateLatch, 0, // Started.
+ partBackupPutLatch, backupNodes.size(),
+ partBackupCommitLatch, backupNodes.size(),
+ partNearCreateLatch, 0, // Started.
+ partNearPutLatch, nearNodes.size(),
+ partNearCommitLatch, nearNodes.size());
+
+ checkTransactionsCount(
+ orig != failed ? orig : null, 2 /* replicated + partitioned */,
+ null, 0,
+ backupNodes, 2 /* replicated + partitioned */,
+ nearNodes, 1 /* replicated */,
+ vers);
+
+ // Replicated recovery.
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.stopBlock(true, t -> {
+ Message msg = t.get2().message();
+
+ return ((GridCacheTxRecoveryRequest)msg).nearXidVersion().equals(replTxVer.get());
+ });
+ }
+
+ replCommitLatch.countDown();
+ replFut.get();
+
+ // Switch partially finished.
+ // Cell 1 (backups) still in switch.
+ // Cell 2 (near nodes) finished the switch.
+ checkUpcomingTransactionsState(
+ replBackupCreateLatch, 0, // Started.
+ replBackupPutLatch, backupNodes.size(),
+ replBackupCommitLatch, backupNodes.size(),
+ replNearCreateLatch, 0, // Started.
+ replNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
+ replNearCommitLatch, nearNodes.size()); // But not able to commit, since some backups (Cell 1) still in switch.
+
+ checkUpcomingTransactionsState(
+ partBackupCreateLatch, 0, // Started.
+ partBackupPutLatch, backupNodes.size(),
+ partBackupCommitLatch, backupNodes.size(),
+ partNearCreateLatch, 0, // Started.
+ partNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
+ partNearCommitLatch, 0); // Able to commit, since all primaries and backups are in Cell 2.
+
+ checkTransactionsCount(
+ orig != failed ? orig : null, 1 /* partitioned */,
+ null, 0,
+ backupNodes, 1 /* partitioned */,
+ nearNodes, 0,
+ vers);
+
+ // Partitioned recovery.
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.stopBlock(true, (t) -> true);
+ }
+
+ partCommitLatch.countDown();
+ partFut.get();
+
+ // Switches finished cluster-wide, all transactions can be committed.
+ checkUpcomingTransactionsState(
+ replBackupCreateLatch, 0,
+ replBackupPutLatch, 0,
+ replBackupCommitLatch, 0,
+ replNearCreateLatch, 0,
+ replNearPutLatch, 0,
+ replNearCommitLatch, 0);
+
+ checkUpcomingTransactionsState(
+ partBackupCreateLatch, 0,
+ partBackupPutLatch, 0,
+ partBackupCommitLatch, 0,
+ partNearCreateLatch, 0,
+ partNearPutLatch, 0,
+ partNearCommitLatch, 0);
+
+ // Check that pre-failure transactions are absent.
+ checkTransactionsCount(
+ orig != failed ? orig : null, 0,
+ null, 0,
+ backupNodes, 0,
+ nearNodes, 0,
+ vers);
+
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.get();
+
+ for (Ignite node : G.allGrids()) {
+ assertEquals(42, node.getOrCreateCache(PART_CACHE_NAME).get(partKey));
+ assertEquals(43, node.getOrCreateCache(REPL_CACHE_NAME).get(replKey));
+ }
+
+ // Final check that any transactions are absent.
+ checkTransactionsCount(
+ null, 0,
+ null, 0,
+ backupNodes, 0,
+ nearNodes, 0,
+ null);
+ }
+
+ /**
+ *
+ */
+ private void checkUpcomingTransactionsState(
+ CountDownLatch backupCreateLatch,
+ int backupCreateCnt,
+ CountDownLatch backupPutLatch,
+ int backupPutCnt,
+ CountDownLatch backupCommitLatch,
+ int backupCommitCnt,
+ CountDownLatch nearCreateLatch,
+ int nearCreateCnt,
+ CountDownLatch nearPutLatch,
+ int nearPutCnt,
+ CountDownLatch nearCommitLatch,
+ int nearCommitCnt) throws InterruptedException {
+ checkTransactionsState(backupCreateLatch, backupCreateCnt);
+ checkTransactionsState(backupPutLatch, backupPutCnt);
+ checkTransactionsState(backupCommitLatch, backupCommitCnt);
+ checkTransactionsState(nearCreateLatch, nearCreateCnt);
+ checkTransactionsState(nearPutLatch, nearPutCnt);
+ checkTransactionsState(nearCommitLatch, nearCommitCnt);
+ }
+
+ /**
+ *
+ */
+ private void checkTransactionsState(CountDownLatch latch, int cnt) throws InterruptedException {
+ if (cnt > 0)
+ assertEquals(cnt, latch.getCount()); // Switch in progress.
+ else
+ latch.await(); // Switch finished (finishing).
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
index cbc2a49..05774ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeSwitchTest.java
@@ -73,7 +73,7 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
private static final String CACHE_NAME = "testCache";
/** Cache configuration closure. */
- private IgniteClosure<String, CacheConfiguration[]> cacheC;
+ private IgniteClosure<String, CacheConfiguration<?,?>[]> cacheC;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -284,10 +284,10 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
/**
* @param nodes Nodes.
- * @param signleCnt Counter for GridDhtPartitionsSingleMessage.
+ * @param singleCnt Counter for GridDhtPartitionsSingleMessage.
* @param fullCnt Counter for GridDhtPartitionsFullMessage.
*/
- private void startPmeMessagesCounting(int nodes, AtomicLong signleCnt, AtomicLong fullCnt) {
+ private void startPmeMessagesCounting(int nodes, AtomicLong singleCnt, AtomicLong fullCnt) {
for (int i = 0; i < nodes; i++) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
@@ -296,7 +296,7 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
@Override public void apply(ClusterNode node, Message msg) {
if (msg.getClass().equals(GridDhtPartitionsSingleMessage.class) &&
((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
- signleCnt.incrementAndGet();
+ singleCnt.incrementAndGet();
if (msg.getClass().equals(GridDhtPartitionsFullMessage.class) &&
((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null)
@@ -337,12 +337,12 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
private void testNoTransactionsWaitAtNodeLeft(int backups, PartitionLossPolicy lossPlc) throws Exception {
persistence = true;
- String cacheName = "three-partitioned";
+ String cacheName = "partitioned";
try {
- cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
- @Override public CacheConfiguration[] apply(String igniteInstanceName) {
- CacheConfiguration ccfg = new CacheConfiguration();
+ cacheC = new IgniteClosure<String, CacheConfiguration<?,?>[]>() {
+ @Override public CacheConfiguration<?,?>[] apply(String igniteInstanceName) {
+ CacheConfiguration<?,?> ccfg = new CacheConfiguration<>();
ccfg.setName(cacheName);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -558,12 +558,12 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
public void testLateAffinityAssignmentOnBackupLeftAndJoin() throws Exception {
String cacheName = "single-partitioned";
- cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
- @Override public CacheConfiguration[] apply(String igniteInstanceName) {
- CacheConfiguration ccfg = new CacheConfiguration();
+ cacheC = new IgniteClosure<String, CacheConfiguration<?, ?>[]>() {
+ @Override public CacheConfiguration<?, ?>[] apply(String igniteInstanceName) {
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
ccfg.setName(cacheName);
- ccfg.setAffinity(new Map1PartitionsTo2NodesAffinityFunction());
+ ccfg.setAffinity(new Map1PartitionTo2NodesAffinityFunction());
return new CacheConfiguration[] {ccfg};
}
@@ -662,11 +662,11 @@ public class GridExchangeFreeSwitchTest extends GridCommonAbstractTest {
/**
*
*/
- private static class Map1PartitionsTo2NodesAffinityFunction extends RendezvousAffinityFunction {
+ private static class Map1PartitionTo2NodesAffinityFunction extends RendezvousAffinityFunction {
/**
* Default constructor.
*/
- public Map1PartitionsTo2NodesAffinityFunction() {
+ public Map1PartitionTo2NodesAffinityFunction() {
super(false, 1);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
index 2355c81..e27ddad 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictio
import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchComplexOperationsTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchIsolationTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.NotMappedPartitionInTxTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest;
@@ -104,6 +106,10 @@ public class IgniteCacheMvccTestSuite5 {
ignoredTests.add(IgniteCacheReadThroughEvictionsVariationsSuite.class);
ignoredTests.add(ClientSlowDiscoveryTransactionRemapTest.class);
+ // Cellular switch can't be performed on MVCC caches, at least at the moment.
+ ignoredTests.add(GridExchangeFreeCellularSwitchIsolationTest.class);
+ ignoredTests.add(GridExchangeFreeCellularSwitchComplexOperationsTest.class);
+
return IgniteCacheTestSuite5.suite(ignoredTests);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 2301168..d3fec85 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -52,6 +52,8 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTes
import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchComplexOperationsTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchIsolationTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeSwitchTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGroupsPartitionLossPolicySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest;
@@ -94,6 +96,9 @@ public class IgniteCacheTestSuite5 {
GridTestUtils.addTestIfNeeded(suite, CacheLateAffinityAssignmentTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheLateAffinityAssignmentNodeJoinValidationTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeSwitchTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchComplexOperationsTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchIsolationTest.class, ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite, EntryVersionConsistencyReadThroughTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheSyncRebalanceModeSelfTest.class, ignoredTests);