You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/04 11:44:09 UTC

[ignite] branch master updated: IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 51accac  IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942.
51accac is described below

commit 51accac9f748689be72bfbce390a19fb8a6896f8
Author: Pavel Voronkin <pv...@gridgain.com>
AuthorDate: Mon Feb 4 13:48:16 2019 +0300

    IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/GridCachePartitionExchangeManager.java   |  17 --
 .../dht/preloader/CacheGroupAffinityMessage.java   |  25 ++-
 .../preloader/GridDhtPartitionsExchangeFuture.java | 193 +++++++++++++--------
 .../dht/topology/GridDhtPartitionTopologyImpl.java | 139 +++++++--------
 .../cache/distributed/CacheParallelStartTest.java  |  54 +++---
 5 files changed, 232 insertions(+), 196 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 525c41a..71a704c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -200,9 +200,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
-    /** */
-    private final Object interruptLock = new Object();
-
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address race conditions when coordinator
@@ -818,13 +815,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @return Interrupt lock.
-     */
-    public Object interruptLock() {
-        return interruptLock;
-    }
-
-    /**
      * @param grpId Cache group ID.
      * @return Topology.
      */
@@ -2639,13 +2629,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            synchronized (interruptLock) {
-                super.cancel();
-            }
-        }
-
         /**
          * Add custom exchange task.
          *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 695eadc..ef56834 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
@@ -135,25 +136,23 @@ public class CacheGroupAffinityMessage implements Message {
     }
 
     /**
+     * Fill Map of CacheGroupAffinityMessages.
+     *
      * @param cctx Context.
      * @param topVer Topology version.
      * @param affReq Cache group IDs.
      * @param cachesAff Optional already prepared affinity.
-     * @return Affinity.
      */
-    static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages(
+    static void createAffinityMessages(
         GridCacheSharedContext cctx,
         AffinityTopologyVersion topVer,
         Collection<Integer> affReq,
-        @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff
+        Map<Integer, CacheGroupAffinityMessage> cachesAff
     ) {
         assert !F.isEmpty(affReq) : affReq;
 
-        if (cachesAff == null)
-            cachesAff = U.newHashMap(affReq.size());
-
         for (Integer grpId : affReq) {
-            if (!cachesAff.containsKey(grpId)) {
+            cachesAff.computeIfAbsent(grpId, (integer) -> {
                 GridAffinityAssignmentCache aff = cctx.affinity().groupAffinity(grpId);
 
                 // If no coordinator group holder on the node, try fetch affinity from existing cache group.
@@ -169,15 +168,13 @@ public class CacheGroupAffinityMessage implements Message {
 
                 List<List<ClusterNode>> assign = aff.readyAssignments(topVer);
 
-                CacheGroupAffinityMessage msg = new CacheGroupAffinityMessage(assign,
+                return new CacheGroupAffinityMessage(
+                    assign,
                     aff.centralizedAffinityFunction() ? aff.idealAssignment() : null,
-                    null);
-
-                cachesAff.put(grpId, msg);
-            }
+                    null
+                );
+            });
         }
-
-        return cachesAff;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5cfa56e..f4043ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -3108,25 +3109,31 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param resTopVer Result topology version.
      */
     private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
-        boolean detected = false;
+        AtomicInteger detected = new AtomicInteger();
 
-        synchronized (cctx.exchange().interruptLock()) {
-            if (Thread.currentThread().isInterrupted())
-                return;
-
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (!grp.isLocal()) {
-                    // Do not trigger lost partition events on start.
-                    boolean event = !localJoinExchange() && !activateCluster();
+        try {
+            // Reserve at least 2 threads for system operations.
+            U.doInParallel(
+                U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
+                cctx.kernalContext().getSystemExecutorService(),
+                cctx.cache().cacheGroups(),
+                grp -> {
+                    if (!grp.isLocal()) {
+                        // Do not trigger lost partition events on start.
+                        boolean evt = !localJoinExchange() && !activateCluster();
 
-                    boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, event ? events().lastEvent() : null);
+                        if (grp.topology().detectLostPartitions(resTopVer, evt ? events().lastEvent() : null))
+                            detected.incrementAndGet();
+                    }
 
-                    detected |= detectedOnGrp;
-                }
-            }
+                    return null;
+                });
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
 
-        if (detected) {
+        if (detected.get() > 0) {
             if (log.isDebugEnabled())
                 log.debug("Partitions have been scheduled to resend [reason=" +
                     "Lost partitions detect on " + resTopVer + "]");
@@ -3143,22 +3150,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private void resetLostPartitions(Collection<String> cacheNames) {
         assert !exchCtx.mergeExchanges();
 
-        synchronized (cctx.exchange().interruptLock()) {
-            if (Thread.currentThread().isInterrupted())
-                return;
-
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
+        try {
+            // Reserve at least 2 threads for system operations.
+            U.doInParallel(
+                U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
+                cctx.kernalContext().getSystemExecutorService(),
+                cctx.cache().cacheGroups(),
+                grp -> {
+                    if (grp.isLocal())
+                        return null;
 
-                for (String cacheName : cacheNames) {
-                    if (grp.hasCache(cacheName)) {
-                        grp.topology().resetLostPartitions(initialVersion());
+                    for (String cacheName : cacheNames) {
+                        if (grp.hasCache(cacheName)) {
+                            grp.topology().resetLostPartitions(initialVersion());
 
-                        break;
+                            break;
+                        }
                     }
-                }
-            }
+
+                    return null;
+                });
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
     }
 
@@ -3295,6 +3309,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
 
+            // Reserve at least 2 threads for system operations.
+            int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
+
             if (exchCtx.mergeExchanges()) {
                 synchronized (mux) {
                     if (mergedJoinExchMsgs != null) {
@@ -3315,51 +3332,65 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 else
                     cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
 
-                for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) {
-                    if (desc.config().getCacheMode() == CacheMode.LOCAL)
-                        continue;
 
-                    CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+                U.doInParallel(
+                    parallelismLvl,
+                    cctx.kernalContext().getSystemExecutorService(),
+                    cctx.affinity().cacheGroups().values(),
+                    desc -> {
+                        if (desc.config().getCacheMode() == CacheMode.LOCAL)
+                            return null;
 
-                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
+                        CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
-                    top.beforeExchange(this, true, true);
-                }
+                        GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                            cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
+
+                        top.beforeExchange(this, true, true);
+
+                        return null;
+                    });
             }
 
             timeBag.finishGlobalStage("Affinity recalculation (crd)");
 
-            Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+            Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
 
-            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
-                GridDhtPartitionsSingleMessage msg = e.getValue();
+            U.doInParallel(
+                parallelismLvl,
+                cctx.kernalContext().getSystemExecutorService(),
+                msgs.entrySet(),
+                entry -> {
+                    GridDhtPartitionsSingleMessage msg = entry.getValue();
 
-                // Apply update counters after all single messages are received.
-                for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
-                    Integer grpId = entry.getKey();
+                    for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
+                        Integer grpId = e.getKey();
 
-                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(grpId, events().discoveryCache());
+                        GridDhtPartitionTopology top = grp != null
+                            ? grp.topology()
+                            : cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
-                    CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
-                        top.partitions());
+                        CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
 
-                    if (cntrs != null)
-                        top.collectUpdateCounters(cntrs);
-                }
+                        if (cntrs != null)
+                            top.collectUpdateCounters(cntrs);
+                    }
 
-                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+                    Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
-                if (affReq != null) {
-                    joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
-                        resTopVer,
-                        affReq,
-                        joinedNodeAff);
+                    if (affReq != null)
+                        CacheGroupAffinityMessage.createAffinityMessages(
+                            cctx,
+                            resTopVer,
+                            affReq,
+                            joinedNodeAff
+                        );
+
+                    return null;
                 }
-            }
+            );
 
             timeBag.finishGlobalStage("Collect update counters and create affinity messages");
 
@@ -3708,14 +3739,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
 
+
         if (affReq != null) {
-            Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
+            Map<Integer, CacheGroupAffinityMessage> cachesAff = U.newHashMap(affReq.size());
+
+            CacheGroupAffinityMessage.createAffinityMessages(
                 cctx,
                 finishState.resTopVer,
                 affReq,
-                null);
+                cachesAff);
 
-            fullMsg.joinedNodeAffinity(aff);
+            fullMsg.joinedNodeAffinity(cachesAff);
         }
 
         if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
@@ -4572,21 +4606,38 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages();
 
                 if (!F.isEmpty(msgs)) {
-                    Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+                    Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>();
 
-                    for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
-                        this.msgs.put(e.getKey().id(), e.getValue());
+                    // Reserve at least 2 threads for system operations.
+                    int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
-                        GridDhtPartitionsSingleMessage msg = e.getValue();
-
-                        Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+                    try {
+                        U.doInParallel(
+                            parallelismLvl,
+                            cctx.kernalContext().getSystemExecutorService(),
+                            msgs.entrySet(),
+                            entry -> {
+                                this.msgs.put(entry.getKey().id(), entry.getValue());
+
+                                GridDhtPartitionsSingleMessage msg = entry.getValue();
+
+                                Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+                                if (!F.isEmpty(affReq)) {
+                                    CacheGroupAffinityMessage.createAffinityMessages(
+                                        cctx,
+                                        fullMsg.resultTopologyVersion(),
+                                        affReq,
+                                        joinedNodeAff
+                                    );
+                                }
 
-                        if (!F.isEmpty(affReq)) {
-                            joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
-                                fullMsg.resultTopologyVersion(),
-                                affReq,
-                                joinedNodeAff);
-                        }
+                                return null;
+                            }
+                        );
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
                     }
 
                     Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index b0decae..ba11df7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -505,108 +505,103 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         ctx.database().checkpointReadLock();
 
         try {
-            synchronized (ctx.exchange().interruptLock()) {
-                if (Thread.currentThread().isInterrupted())
-                    throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
-
-                U.writeLock(lock);
+            U.writeLock(lock);
 
-                try {
-                    if (stopping)
-                        return;
+            try {
+                if (stopping)
+                    return;
 
-                    assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
-                        ", exchId=" + exchFut.exchangeId() + ']';
+                assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
+                    ", exchId=" + exchFut.exchangeId() + ']';
 
-                    ExchangeDiscoveryEvents evts = exchFut.context().events();
+                ExchangeDiscoveryEvents evts = exchFut.context().events();
 
-                    if (affReady) {
-                        assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" +
-                            "grp=" + grp.cacheOrGroupName() +
-                            ", affVer=" + grp.affinity().lastVersion() +
-                            ", evtsVer=" + evts.topologyVersion() + ']';
+                if (affReady) {
+                    assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" +
+                        "grp=" + grp.cacheOrGroupName() +
+                        ", affVer=" + grp.affinity().lastVersion() +
+                        ", evtsVer=" + evts.topologyVersion() + ']';
 
-                        lastTopChangeVer = readyTopVer = evts.topologyVersion();
+                    lastTopChangeVer = readyTopVer = evts.topologyVersion();
 
-                        discoCache = evts.discoveryCache();
-                    }
+                    discoCache = evts.discoveryCache();
+                }
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
-                            ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
-                    }
+                if (log.isDebugEnabled()) {
+                    log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
+                        ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
+                }
 
-                    long updateSeq = this.updateSeq.incrementAndGet();
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-                    cntrMap.clear();
+                cntrMap.clear();
 
-                    initializeFullMap(updateSeq);
+                initializeFullMap(updateSeq);
 
-                    boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+                boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
 
-                    if (evts.hasServerLeft()) {
-                        List<DiscoveryEvent> evts0 = evts.events();
+                if (evts.hasServerLeft()) {
+                    List<DiscoveryEvent> evts0 = evts.events();
 
-                        for (int i = 0; i < evts0.size(); i++) {
-                            DiscoveryEvent evt = evts0.get(i);
+                    for (int i = 0; i < evts0.size(); i++) {
+                        DiscoveryEvent evt = evts0.get(i);
 
-                            if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
-                                removeNode(evt.eventNode().id());
-                        }
+                        if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+                            removeNode(evt.eventNode().id());
                     }
+                }
 
-                    if (grp.affinityNode()) {
-                        if (grpStarted ||
-                            exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
-                            exchFut.serverNodeDiscoveryEvent()) {
-
-                            AffinityTopologyVersion affVer;
-                            List<List<ClusterNode>> affAssignment;
+                if (grp.affinityNode()) {
+                    if (grpStarted ||
+                        exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
+                        exchFut.serverNodeDiscoveryEvent()) {
 
-                            if (affReady) {
-                                affVer = evts.topologyVersion();
+                        AffinityTopologyVersion affVer;
+                        List<List<ClusterNode>> affAssignment;
 
-                                assert grp.affinity().lastVersion().equals(affVer) :
-                                        "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
-                                                ", grp=" + grp.cacheOrGroupName() +
-                                                ", affVer=" + affVer +
-                                                ", fut=" + exchFut + ']';
+                        if (affReady) {
+                            affVer = evts.topologyVersion();
 
-                                affAssignment = grp.affinity().readyAssignments(affVer);
-                            }
-                            else {
-                                assert !exchFut.context().mergeExchanges();
+                            assert grp.affinity().lastVersion().equals(affVer) :
+                                    "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+                                            ", grp=" + grp.cacheOrGroupName() +
+                                            ", affVer=" + affVer +
+                                            ", fut=" + exchFut + ']';
 
-                                affVer = exchFut.initialVersion();
-                                affAssignment = grp.affinity().idealAssignment();
-                            }
+                            affAssignment = grp.affinity().readyAssignments(affVer);
+                        }
+                        else {
+                            assert !exchFut.context().mergeExchanges();
 
-                            initPartitions(affVer, affAssignment, exchFut, updateSeq);
+                            affVer = exchFut.initialVersion();
+                            affAssignment = grp.affinity().idealAssignment();
                         }
-                    }
 
-                    consistencyCheck();
+                        initPartitions(affVer, affAssignment, exchFut, updateSeq);
+                    }
+                }
 
-                    if (updateMoving) {
-                        assert grp.affinity().lastVersion().equals(evts.topologyVersion());
+                consistencyCheck();
 
-                        createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
-                    }
+                if (updateMoving) {
+                    assert grp.affinity().lastVersion().equals(evts.topologyVersion());
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " +
-                            "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
-                    }
+                    createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
+                }
 
-                    if (log.isTraceEnabled()) {
-                        log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName()
-                            + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
-                    }
+                if (log.isDebugEnabled()) {
+                    log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " +
+                        "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
                 }
-                finally {
-                    lock.writeLock().unlock();
+
+                if (log.isTraceEnabled()) {
+                    log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName()
+                        + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
                 }
             }
+            finally {
+                lock.writeLock().unlock();
+            }
         }
         finally {
             ctx.database().checkpointReadUnlock();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
index f8ce7b4..d01d822 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
@@ -38,7 +38,11 @@ import org.junit.Test;
  */
 public class CacheParallelStartTest extends GridCommonAbstractTest {
     /** */
-    private static final int CACHES_COUNT = 500;
+    private static final int CACHES_COUNT = 5000;
+
+    /** */
+    private static final int GROUPS_COUNT = 50;
+
 
     /** */
     private static final String STATIC_CACHE_PREFIX = "static-cache-";
@@ -54,11 +58,15 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
 
         cfg.setSystemThreadPoolSize(Runtime.getRuntime().availableProcessors() * 3);
 
-        long sz = 100 * 1024 * 1024;
+        long sz = 512 * 1024 * 1024;
 
         DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
                 .setDefaultDataRegionConfiguration(
-                        new DataRegionConfiguration().setPersistenceEnabled(false).setInitialSize(sz).setMaxSize(sz))
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(false)
+                        .setInitialSize(sz)
+                        .setMaxSize(sz)
+                )
                 .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
 
         cfg.setDataStorageConfiguration(memCfg);
@@ -66,7 +74,7 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
         ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT);
 
         for (int i = 0; i < CACHES_COUNT; i++)
-            staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i));
+            staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX, i));
 
         cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT]));
 
@@ -77,12 +85,12 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
      * @param cacheName Cache name.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration(String cacheName) {
+    private CacheConfiguration cacheConfiguration(String cacheName, int i) {
         CacheConfiguration cfg = defaultCacheConfiguration();
 
-        cfg.setName(cacheName);
+        cfg.setName(cacheName + i);
         cfg.setBackups(1);
-        cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME);
+        cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME + i % GROUPS_COUNT);
         cfg.setIndexedTypes(Long.class, Long.class);
 
         return cfg;
@@ -175,20 +183,22 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
      *
      */
     private void assertCaches(IgniteEx igniteEx) {
-        Collection<GridCacheContext> caches = igniteEx
-                .context()
-                .cache()
-                .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME))
-                .caches();
-
-        assertEquals(CACHES_COUNT, caches.size());
-
-        @Nullable CacheGroupContext cacheGroup = igniteEx
-                .context()
-                .cache()
-                .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME));
-
-        for (GridCacheContext cacheContext : caches)
-            assertEquals(cacheContext.group(), cacheGroup);
+        for (int i = 0; i < GROUPS_COUNT; i++) {
+            Collection<GridCacheContext> caches = igniteEx
+                    .context()
+                    .cache()
+                    .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i))
+                    .caches();
+
+            assertEquals(CACHES_COUNT / GROUPS_COUNT, caches.size());
+
+            @Nullable CacheGroupContext cacheGrp = igniteEx
+                    .context()
+                    .cache()
+                    .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i));
+
+            for (GridCacheContext cacheContext : caches)
+                assertEquals(cacheContext.group(), cacheGrp);
+        }
     }
 }