You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:36:01 UTC

[32/50] ignite git commit: ignite-5578 Affinity for local join

ignite-5578 Affinity for local join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c712412
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c712412
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c712412

Branch: refs/heads/ignite-5578
Commit: 0c7124122e8eebaad1a85f844277a5cf1564a8de
Parents: 62e3e70
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 12 11:32:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 12 15:49:44 2017 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |   8 +
 .../cache/CacheAffinitySharedManager.java       | 105 ++++++----
 .../GridCachePartitionExchangeManager.java      |  19 +-
 .../GridDhtPartitionsExchangeFuture.java        | 191 +++++++++++++------
 4 files changed, 233 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a8c6c59..a8ac825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -337,6 +337,14 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param topVer
+     * @return
+     */
+    public boolean lastVersionEquals(AffinityTopologyVersion topVer) {
+        return topVer.equals(lastVersion());
+    }
+
+    /**
      * @return Last calculated affinity version.
      */
     public AffinityTopologyVersion lastVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 45586c7..3f24547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1230,7 +1230,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public GridAffinityAssignmentCache affinity(Integer grpId) {
         CacheGroupHolder grpHolder = grpHolders.get(grpId);
 
-        assert grpHolder != null : grpId;
+        assert grpHolder != null : debugGroupName(grpId);
 
         return grpHolder.affinity();
     }
@@ -1311,6 +1311,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param grpId Group ID.
+     * @return Group name for debug purpose.
+     */
+    private String debugGroupName(int grpId) {
+        CacheGroupDescriptor desc = caches.group(grpId);
+
+        if (desc != null)
+            return desc.cacheOrGroupName();
+        else
+            return "Unknown group: " + grpId;
+    }
+
+    /**
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
@@ -1450,8 +1463,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Future completed when caches initialization is done.
      */
-    private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
-        throws IgniteCheckedException {
+    public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut,
+        final boolean newAff) throws IgniteCheckedException {
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
 
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -1483,51 +1496,75 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
-                    List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
+                    if (newAff) {
+                        if (!aff.lastVersionEquals(fut.topologyVersion())) {
+                            List<List<ClusterNode>> assign =
+                                aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
-                    int idx = exchFuts.indexOf(fut);
+                            aff.initialize(fut.topologyVersion(), assign);
+                        }
+                    }
+                    else {
+                        List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
 
-                    assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
-                        ", total=" + exchFuts.size() + ']';
+                        int idx = exchFuts.indexOf(fut);
 
-                    final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
+                        assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
+                            ", total=" + exchFuts.size() + ']';
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Need initialize affinity on coordinator [" +
-                            "cacheGrp=" + desc.cacheOrGroupName() +
-                            "prevAff=" + prev.topologyVersion() + ']');
-                    }
+                        final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
 
-                    assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
+                        if (log.isDebugEnabled()) {
+                            log.debug("Need initialize affinity on coordinator [" +
+                                "cacheGrp=" + desc.cacheOrGroupName() +
+                                "prevAff=" + prev.topologyVersion() + ']');
+                        }
 
-                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        desc.groupId(),
-                        prev.topologyVersion(),
-                        prev.discoCache());
+                        assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
-                    fetchFut.init(false);
+                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                            desc.groupId(),
+                            prev.topologyVersion(),
+                            prev.discoCache());
 
-                    final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
+                        fetchFut.init(false);
 
-                    fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
-                        @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
-                            throws IgniteCheckedException {
-                            fetchAffinity(prev.topologyVersion(),
-                                prev.discoveryEvent(),
-                                prev.discoCache(),
-                                aff, (GridDhtAssignmentFetchFuture)fetchFut);
+                        final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+                        fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
+                            @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
+                                throws IgniteCheckedException {
+                                fetchAffinity(prev.topologyVersion(),
+                                    prev.discoveryEvent(),
+                                    prev.discoCache(),
+                                    aff,
+                                    (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            affFut.onDone(fut.topologyVersion());
-                        }
-                    });
+                                aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+                                affFut.onDone(fut.topologyVersion());
+                            }
+                        });
 
-                    futs.add(affFut);
+                        futs.add(affFut);
+                    }
                 }
-                else
+                else {
                     grpHolder = new CacheGroupHolder1(grp, null);
 
+                    if (newAff) {
+                        GridAffinityAssignmentCache aff = grpHolder.affinity();
+
+                        if (!aff.lastVersionEquals(fut.topologyVersion())) {
+                            List<List<ClusterNode>> assign = aff.calculate(fut.topologyVersion(),
+                                fut.discoveryEvent(),
+                                fut.discoCache());
+
+                            aff.initialize(fut.topologyVersion(), assign);
+                        }
+                    }
+                }
+
                 CacheGroupHolder old = grpHolders.put(grpHolder.groupId(), grpHolder);
 
                 assert old == null : old;
@@ -1757,7 +1794,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(
         final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
+        IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut, false);
 
         if (initFut != null && !initFut.isDone()) {
             final GridFutureAdapter<Map<Integer, Map<Integer, List<UUID>>>> resFut = new GridFutureAdapter<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 018537c..51214e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1438,7 +1438,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
 
         try {
-            sendLocalPartitions(node, msg.exchangeId());
+            List<GridDhtPartitionsExchangeFuture> futs = exchangeFutures();
+
+            GridDhtPartitionsExchangeFuture fut = null;
+
+            for (int i = futs.size() - 1; i >= 0; i++) {
+                GridDhtPartitionsExchangeFuture fut0 = futs.get(i);
+
+                if (fut0.exchangeId().equals(msg.exchangeId())) {
+                    fut = fut0;
+
+                    break;
+                }
+            }
+
+            if (fut != null)
+                fut.processSinglePartitionRequest(node, msg);
+            else
+                sendLocalPartitions(node, msg.exchangeId());
         }
         finally {
             leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0c712412/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 81b288c..4a39bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -136,6 +136,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private ClusterNode crd;
 
+    /** */
+    private boolean crdReady;
+
     /** ExchangeFuture id. */
     private final GridDhtPartitionExchangeId exchId;
 
@@ -169,7 +172,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
      */
-    private final Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>();
 
     /** Messages received from new coordinator. */
     private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>();
@@ -224,6 +227,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private ExchangeContext exchCtx;
 
+    /** */
+    private FinishState finishState;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -452,6 +458,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
+            if (crdNode)
+                crdReady = true;
+
             exchLog.info("Started exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
                 ", evt=" + discoEvt.type() +
@@ -1434,44 +1443,44 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         return;
                     }
 
-                    processMessage(node, msg);
+                    processMessage(node.id(), msg);
                 }
             });
         }
     }
 
     /**
-     * @param node Sender node.
+     * @param nodeId Sender node.
      * @param msg Message.
      */
-    private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+    private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
         synchronized (this) {
             assert crd != null;
 
-            if (crd.isLocal()) {
-                if (remaining.remove(node.id())) {
+            if (crd.isLocal() && crdReady) {
+                if (remaining.remove(nodeId)) {
                     updateSingleMap = true;
 
                     pendingSingleUpdates++;
 
                     if (stateChangeExchange() && msg.getError() != null)
-                        changeGlobalStateExceptions.put(node.id(), msg.getError());
+                        changeGlobalStateExceptions.put(nodeId, msg.getError());
 
                     allReceived = remaining.isEmpty();
                 }
             }
             else
-                singleMsgs.put(node, msg);
+                singleMsgs.put(nodeId, msg);
         }
 
         if (updateSingleMap) {
             try {
                 // Do not update partition map, in case cluster transitioning to inactive state.
                 if (!deactivateCluster())
-                    updatePartitionSingleMap(node, msg);
+                    updatePartitionSingleMap(nodeId, msg);
             }
             finally {
                 synchronized (this) {
@@ -1747,17 +1756,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (!grp.isLocal()) {
-                        if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) {
-                            List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(),
-                                discoEvt,
-                                discoCache);
-
-                            grp.affinity().initialize(topologyVersion(), aff);
-                        }
-
+                    if (!grp.isLocal())
                         grp.topology().beforeExchange(this, !centralizedAff);
-                    }
                 }
             }
 
@@ -2013,12 +2013,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param node Sender node.
      * @param msg Message.
      */
+    public void processSinglePartitionRequest(final ClusterNode node, GridDhtPartitionsSingleRequest msg) {
+        if (!cctx.discovery().alive(node.id()))
+            return;
+
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                synchronized (this) {
+                    if (finishState != null && node.id().equals(finishState.crdId))
+                        return;
+                }
+
+                try {
+                    sendLocalPartitions(node);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message to coordinator: " + e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param node Sender node.
+     * @param msg Message.
+     */
     private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
         assert exchId.equals(msg.exchangeId()) : msg;
         assert msg.lastVersion() != null : msg;
 
         synchronized (this) {
-            if (crd == null)
+            if (crd == null || finishState != null)
                 return;
 
             if (!crd.equals(node)) {
@@ -2031,6 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 return;
             }
+
+            finishState = new FinishState(crd.id());
         }
 
         Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
@@ -2040,7 +2067,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity();
 
-            assert !F.isEmpty(cachesAff) : cachesAff;
+            assert !F.isEmpty(cachesAff) : msg;
             assert cachesAff.size() >= affReq.size();
 
             int cnt = 0;
@@ -2153,11 +2180,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * Updates partition map in all caches.
      *
-     * @param node Node sent message.
+     * @param nodeId Node message received from.
      * @param msg Partitions single message.
      */
-    private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
-        msgs.put(node.id(), msg);
+    private void updatePartitionSingleMap(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+        msgs.put(nodeId, msg);
 
         for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
             Integer grpId = entry.getKey();
@@ -2282,7 +2309,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     try {
                         boolean crdChanged = false;
                         boolean allReceived = false;
-                        Set<UUID> reqFrom = null;
+                        Set<UUID> remaining0 = null;
 
                         ClusterNode crd0;
 
@@ -2301,11 +2328,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             }
 
                             if (crd != null && crd.isLocal()) {
-                                if (rmvd)
+                                if (crdChanged)
+                                    remaining0 = new HashSet<>(remaining);
+                                else if (crdReady && rmvd)
                                     allReceived = remaining.isEmpty();
-
-                                if (crdChanged && !remaining.isEmpty())
-                                    reqFrom = new HashSet<>(remaining);
                             }
 
                             crd0 = crd;
@@ -2334,35 +2360,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             if (stateChangeExchange() && changeGlobalStateE != null)
                                 changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
 
-                            if (allReceived) {
-                                awaitSingleMapUpdates();
-
-                                onAllReceived();
+                            if (crdChanged) {
+                                boolean newAff = localJoinExchange();
+
+                                IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(
+                                    GridDhtPartitionsExchangeFuture.this, newAff);
+
+                                if (fut == null || fut.isDone())
+                                    onBecomeCoordinator();
+                                else {
+                                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                                            onBecomeCoordinator();
+                                        }
+                                    });
+                                }
 
                                 return;
                             }
 
-                            if (crdChanged && reqFrom != null) {
-                                GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
-
-                                for (UUID nodeId : reqFrom) {
-                                    try {
-                                        // It is possible that some nodes finished exchange with previous coordinator.
-                                        cctx.io().send(nodeId, req, SYSTEM_POOL);
-                                    }
-                                    catch (ClusterTopologyCheckedException ignored) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Node left during partition exchange [nodeId=" + nodeId +
-                                                ", exchId=" + exchId + ']');
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        U.error(log, "Failed to request partitions from node: " + nodeId, e);
-                                    }
-                                }
-                            }
+                            if (allReceived) {
+                                awaitSingleMapUpdates();
 
-                            for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
-                                processMessage(m.getKey(), m.getValue());
+                                onAllReceived();
+                            }
                         }
                         else {
                             if (crdChanged) {
@@ -2373,11 +2394,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                             }
                         }
                     }
-                    catch (Exception e) {
+                    catch (IgniteCheckedException e) {
                         if (reconnectOnError(e))
                             onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
                         else
-                            throw e;
+                            U.error(log, "Failed to process node left event: " + e, e);
                     }
                     finally {
                         leaveBusy();
@@ -2391,6 +2412,51 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     *
+     */
+    private void onBecomeCoordinator() {
+        Set<UUID> remaining0 = null;
+
+        synchronized (this) {
+            assert crd != null && crd.isLocal();
+            assert !crdReady;
+
+            crdReady = true;
+
+            if (!remaining.isEmpty())
+                remaining0 = new HashSet<>(remaining);
+        }
+
+        if (remaining0 != null) {
+            // It is possible that some nodes finished exchange with previous coordinator.
+            GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
+
+            for (UUID nodeId : remaining0) {
+                try {
+                    if (!singleMsgs.containsKey(nodeId))
+                        cctx.io().send(nodeId, req, SYSTEM_POOL);
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left during partition exchange [nodeId=" + nodeId +
+                            ", exchId=" + exchId + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to request partitions from node: " + nodeId, e);
+                }
+            }
+
+            for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
+                processMessage(m.getKey(), m.getValue());
+        }
+        else {
+            awaitSingleMapUpdates();
+
+            onAllReceived();
+        }
+    }
+
+    /**
      * @param e Exception.
      * @return {@code True} if local node should try reconnect in case of error.
      */
@@ -2529,4 +2595,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         return nextTimeout <= limit ? nextTimeout : limit;
     }
+
+    /**
+     *
+     */
+    private static class FinishState {
+        /** */
+        private final UUID crdId;
+
+        /**
+         * @param crdId Coordinator node.
+         */
+        FinishState(UUID crdId) {
+            this.crdId = crdId;
+        }
+    }
 }