You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 14:36:19 UTC

[50/50] ignite git commit: ignite-5578

ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ef68e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ef68e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ef68e3

Branch: refs/heads/ignite-5578
Commit: c9ef68e3f21aeec60ee2c8605f849b581a3d6ce4
Parents: 39cccec
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 14:59:35 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 17:35:17 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 138 ++++++------
 .../processors/cache/ExchangeContext.java       |  50 ++---
 .../cache/ExchangeDiscoveryEvents.java          |  52 ++++-
 .../GridCachePartitionExchangeManager.java      |  55 ++---
 .../GridDhtPartitionsExchangeFuture.java        | 219 +++++++++++--------
 .../preloader/GridDhtPartitionsFullMessage.java |   4 +
 .../CacheExchangeCoalescingTest.java            |  73 -------
 .../distributed/CacheExchangeMergeTest.java     |  73 +++++++
 8 files changed, 361 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a31ab1c..4ea61a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -1232,38 +1233,45 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param crd Coordinator flag.
-     * @throws IgniteCheckedException If failed.
+     * @param grpId Cache group ID.
+     * @return Affinity assignments.
      */
-    public void onLocalJoin(boolean crd) throws IgniteCheckedException {
+    public GridAffinityAssignmentCache affinity(Integer grpId) {
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+        assert grpHolder != null : debugGroupName(grpId);
 
+        return grpHolder.affinity();
     }
 
-    public void processDiscoveryEvents(ExchangeDiscoveryEvents evts) {
-        AffinityTopologyVersion topVer = evts.topologyVersion();
+    public Map<Integer, Map<Integer, List<UUID>>> onTopologyChange(GridDhtPartitionsExchangeFuture fut,
+        boolean crd)
+        throws IgniteCheckedException
+    {
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
+        assert evts.serverJoin() || evts.serverLeft();
 
         if (evts.serverLeft()) {
+            forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                    AffinityTopologyVersion topVer = evts.topologyVersion();
 
-        }
-        else if (evts.serverJoin()) {
+                    CacheGroupHolder cache = groupHolder(topVer, desc);
 
-        }
-        else {
+                    cache.affinity().calculate(topVer, evts.event(), evts.discoveryCache());
+                }
+            });
 
+            return initAffinityOnNodeLeft0(evts.topologyVersion(), fut);
         }
-    }
-
-
-    /**
-     * @param grpId Cache group ID.
-     * @return Affinity assignments.
-     */
-    public GridAffinityAssignmentCache affinity(Integer grpId) {
-        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+        else {
+            WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd);
 
-        assert grpHolder != null : debugGroupName(grpId);
+            setWaitRebalanceInfo(waitRebalanceInfo, evts.topologyVersion(), crd);
 
-        return grpHolder.affinity();
+            return null;
+        }
     }
 
     /**
@@ -1299,25 +1307,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             else
                 fetchAffinityOnJoin(fut);
         }
-        else {
-            waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache(),
-                crd);
-        }
+        else
+            waitRebalanceInfo = initAffinityOnNodeJoin(fut.context().events(), crd);
 
-        synchronized (mux) {
-            affCalcVer = fut.topologyVersion();
+        setWaitRebalanceInfo(waitRebalanceInfo, fut.topologyVersion(), crd);
+    }
 
-            this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
+    private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) {
+        affCalcVer = topVer;
 
-            WaitRebalanceInfo info = this.waitInfo;
+        this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
 
-            if (crd) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
-                        ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
-                }
+        WaitRebalanceInfo info = this.waitInfo;
+
+        if (crd) {
+            if (log.isDebugEnabled()) {
+                log.debug("Computed new affinity after node join [topVer=" + topVer +
+                    ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
             }
         }
     }
@@ -1654,17 +1660,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param topVer Topology version.
-     * @param evt Discovery event.
-     * @param discoCache Discovery data cache.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return Rabalance info.
      */
-    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer,
-        final DiscoveryEvent evt,
-        final DiscoCache discoCache,
-        boolean crd)
+    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final ExchangeDiscoveryEvents evts, boolean crd)
         throws IgniteCheckedException {
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
@@ -1675,23 +1675,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache);
+                initAffinityOnNodeJoin(evts,
+                    evts.groupAddedOnExchange(grp.groupId(), grp.receivedFrom()),
+                    grp.affinity(),
+                    null,
+                    latePrimary,
+                    affCache);
             }
 
             return null;
         }
         else {
-            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.topologyVersion());
 
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(topVer, desc);
+                    CacheGroupHolder cache = groupHolder(evts.topologyVersion(), desc);
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(topVer,
-                        evt,
-                        discoCache,
+                    initAffinityOnNodeJoin(evts,
+                        evts.groupAddedOnExchange(desc.groupId(), desc.receivedFrom()),
                         cache.affinity(),
                         waitRebalanceInfo,
                         latePrimary,
@@ -1704,24 +1708,33 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param topVer Topology version.
-     * @param evt Discovery event.
-     * @param discoCache Discovery data cache.
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
      * @param affCache Already calculated assignments (to reduce data stored in history).
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
-        DiscoveryEvent evt,
-        DiscoCache discoCache,
+    private void initAffinityOnNodeJoin(
+        ExchangeDiscoveryEvents evts,
+        boolean addedOnExchnage,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
         Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
+        if (addedOnExchnage) {
+            if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) {
+                List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(),
+                    evts.event(),
+                    evts.discoveryCache());
+
+                aff.initialize(evts.topologyVersion(), newAff);
+            }
+
+            return;
+        }
+
         AffinityTopologyVersion affTopVer = aff.lastVersion();
 
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
@@ -1731,7 +1744,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache);
+        List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.event(), evts.discoveryCache());
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1743,7 +1756,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null;
 
                 if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) {
-                    assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary;
+                    assert cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null : curPrimary;
 
                     List<ClusterNode> nodes0 = latePrimaryAssignment(aff,
                         p,
@@ -1762,7 +1775,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
+        aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
     }
 
     /**
@@ -1834,7 +1847,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> initFut) {
                     try {
-                        resFut.onDone(initAffinityOnNodeLeft0(fut));
+                        resFut.onDone(initAffinityOnNodeLeft0(fut.topologyVersion(), fut));
                     }
                     catch (IgniteCheckedException e) {
                         resFut.onDone(e);
@@ -1845,7 +1858,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return resFut;
         }
         else
-            return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut));
+            return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.topologyVersion(), fut));
     }
 
     /**
@@ -1853,10 +1866,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+    private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer,
+        final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        final AffinityTopologyVersion topVer = fut.topologyVersion();
-
         final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
 
         final Collection<ClusterNode> aliveNodes = cctx.discovery().nodes(topVer);
@@ -1865,7 +1877,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc);
+                CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
                 if (!grpHolder.rebalanceEnabled)
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index eeb7b23..1d7b73a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,14 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.jetbrains.annotations.Nullable;
 
@@ -39,24 +34,25 @@ public class ExchangeContext {
     private boolean fetchAffOnJoin;
 
     /** */
-    private final boolean coalescing;
+    private final boolean merge;
 
     /** */
-    private AffinityTopologyVersion resTopVer;
-
-    /** */
-    private final Map<Integer, List<List<ClusterNode>>> affMap = new HashMap<>();
+    private final ExchangeDiscoveryEvents evts;
 
     /**
      * @param protocolVer Protocol version.
-     * @param topVer Topology version.
+     * @param fut Exchange future.
      */
-    public ExchangeContext(int protocolVer, AffinityTopologyVersion topVer) {
+    public ExchangeContext(int protocolVer, GridDhtPartitionsExchangeFuture fut) {
         fetchAffOnJoin = protocolVer == 1;
 
-        coalescing = protocolVer > 1;
+        merge = protocolVer > 1;
+
+        evts = new ExchangeDiscoveryEvents(fut);
+    }
 
-        this.resTopVer = topVer;
+    public ExchangeDiscoveryEvents events() {
+        return evts;
     }
 
     /**
@@ -84,27 +80,7 @@ public class ExchangeContext {
         return requestGrpsAffOnJoin;
     }
 
-    public boolean coalescing() {
-        return coalescing;
-    }
-
-    public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx, GridAffinityAssignmentCache aff) {
-        List<List<ClusterNode>> assignment = affMap.get(aff.groupId());
-
-        if (assignment != null)
-            return assignment;
-
-        AffinityTopologyVersion affTopVer = aff.lastVersion();
-
-        assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
-            ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
-
-        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
-
-        assert aff.idealAssignment() != null : "Previous assignment is not available.";
-
-        affMap.put(aff.groupId(), curAff);
-
-        return curAff;
+    public boolean canMergeExchanges() {
+        return merge;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index fced92e..7d3e256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -17,13 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
+import static com.sun.corba.se.impl.util.RepositoryId.cache;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -39,6 +45,12 @@ public class ExchangeDiscoveryEvents {
     private DiscoCache discoCache;
 
     /** */
+    private DiscoveryEvent evt;
+
+    /** */
+    private List<DiscoveryEvent> evts = new ArrayList<>();
+
+    /** */
     private boolean srvJoin;
 
     /** */
@@ -47,18 +59,36 @@ public class ExchangeDiscoveryEvents {
     /**
      * @param fut Future.
      */
-    void init(GridDhtPartitionsExchangeFuture fut) {
-        topVer = fut.topologyVersion();
-        discoCache = fut.discoCache();
+    ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
+        addEvent(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+    }
 
-        ClusterNode node = fut.discoveryEvent().eventNode();
+    boolean groupAddedOnExchange(int grpId, UUID rcvdFrom) {
+        for (DiscoveryEvent evt : evts) {
+            if (evt.type() == EVT_NODE_JOINED && rcvdFrom.equals(evt.eventNode().id()))
+                return true;
+        }
 
-        if (fut.discoveryEvent().type()== EVT_NODE_JOINED)
-            srvJoin = !CU.clientNode(node);
-        else {
-            assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type() == EVT_NODE_FAILED;
+        return false;
+    }
 
-            srvLeft = !CU.clientNode(node);
+    void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cache) {
+        evts.add(evt);
+
+        this.topVer = topVer;
+        this.evt = evt;
+        this.discoCache = cache;
+
+        ClusterNode node = evt.eventNode();
+
+        if (!CU.clientNode(node)) {
+            if (evt.type()== EVT_NODE_JOINED)
+                srvJoin = true;
+            else {
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
+
+                srvLeft = !CU.clientNode(node);
+            }
         }
     }
 
@@ -66,6 +96,10 @@ public class ExchangeDiscoveryEvents {
         return discoCache;
     }
 
+    DiscoveryEvent event() {
+        return evt;
+    }
+
     AffinityTopologyVersion topologyVersion() {
         return topVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 135b771..4e42bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1738,28 +1738,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx);
     }
 
-    private boolean supportsCoalescing(ClusterNode node) {
+    private boolean supportsMergeExchanges(ClusterNode node) {
         return exchangeProtocolVersion(node.version()) > 1;
     }
 
     /** */
-    private volatile AffinityTopologyVersion coalesceTestWaitVer;
+    private volatile AffinityTopologyVersion exchMergeTestWaitVer;
 
     /**
-     * @param coalesceTestWaitVer
+     * For testing only.
+     *
+     * @param exchMergeTestWaitVer Version to wait for.
      */
-    public void coalesceTestWaitVersion(AffinityTopologyVersion coalesceTestWaitVer) {
-        this.coalesceTestWaitVer = coalesceTestWaitVer;
+    public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) {
+        this.exchMergeTestWaitVer = exchMergeTestWaitVer;
     }
 
-    public ExchangeDiscoveryEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) {
-        ExchangeDiscoveryEvents evts = null;
-
-        AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer;
+    public boolean mergeExchanges(GridDhtPartitionsExchangeFuture curFut) {
+        AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer;
 
-        if (coalesceTestWaitVer != null) {
+        if (exchMergeTestWaitVer != null) {
             log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() +
-                ", waitVer=" + coalesceTestWaitVer + ']');
+                ", waitVer=" + exchMergeTestWaitVer + ']');
 
             long end = U.currentTimeMillis() + 10_000;
 
@@ -1770,8 +1770,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (task instanceof GridDhtPartitionsExchangeFuture) {
                         GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
-                        if (coalesceTestWaitVer.equals(fut.topologyVersion())) {
-                            log.info("Coalesce test, found awaited version: " + coalesceTestWaitVer);
+                        if (exchMergeTestWaitVer.equals(fut.topologyVersion())) {
+                            log.info("Coalesce test, found awaited version: " + exchMergeTestWaitVer);
 
                             found = true;
 
@@ -1793,40 +1793,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
-        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
-            if (task instanceof GridDhtPartitionsExchangeFuture) {
-                GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
+        synchronized (curFut) {
+            int awaited = 0;
 
-                int evtType = fut.discoveryEvent().type();
+            for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+                if (task instanceof GridDhtPartitionsExchangeFuture) {
+                    GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
-                if (evtType == EVT_NODE_JOINED) {
                     DiscoveryEvent evt = fut.discoveryEvent();
 
                     ClusterNode node = evt.eventNode();
 
-                    if (!supportsCoalescing(node))
+                    if (!supportsMergeExchanges(node))
                         break;
 
-                    fut.mergeWithFuture(curFut);
-
-                    if (evts == null)
-                        evts = new ExchangeDiscoveryEvents();
-
-                    evts.init(fut);
+                    if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node))
+                        fut.mergeServerJoinExchange(curFut);
 
                     exchWorker.futQ.remove(fut);
                 }
                 else
                     break;
-//                else if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) {
-//
-//                }
             }
-            else
-                break;
-        }
 
-        return evts;
+            return awaited == 0;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1ec3d73..463e330 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -210,6 +212,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
 
     /** */
+    private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
+
+    /** */
+    private int awaitMergedMsgs;
+
+    /** */
     @GridToStringExclude
     private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
 
@@ -444,7 +452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
 
         exchCtx = new ExchangeContext(cctx.exchange().exchangeProtocolVersion(discoCache.minimumNodeVersion()),
-            topologyVersion());
+            this);
 
         try {
             discoCache.updateAlives(cctx.discovery());
@@ -497,51 +505,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
             else {
-                if (exchCtx.coalescing()) {
-//                    if (discoEvt.type() == EVT_NODE_JOINED) {
-//                        if (discoEvt.eventNode().isLocal()) {
-//                            localJoin();
-//
-//                            if (crdNode) {
-//                                exchange = ExchangeType.NONE;
-//                            }
-//                            else
-//                                sendLocalJoinMessage(crd);
-//                        }
-//                        else {
-//                            if (CU.clientNode(discoEvt.eventNode())) {
-//                                onClientNodeEvent(crdNode);
-//
-//                                exchange = ExchangeType.NONE;
-//                            }
-//                            else {
-//                                if (cctx.kernalContext().clientNode())
-//                                    exchange = ExchangeType.CLIENT;
-//                                else {
-//
-//                                }
-//                            }
-//                        }
-//                    }
-//                    else {
-//
-//                    }
-                }
-                else {
-                    if (discoEvt.type() == EVT_NODE_JOINED) {
-                        if (!discoEvt.eventNode().isLocal()) {
-                            Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
-                                discoEvt.eventNode().id(),
-                                topVer);
+                if (discoEvt.type() == EVT_NODE_JOINED) {
+                    if (!discoEvt.eventNode().isLocal()) {
+                        Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+                            discoEvt.eventNode().id(),
+                            topVer);
 
-                            cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
-                        }
-                        else
-                            initCachesOnLocalJoin();
+                        cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
                     }
+                    else
+                        initCachesOnLocalJoin();
+                }
 
-                    exchange = CU.clientNode(discoEvt.eventNode()) ?
-                        onClientNodeEvent(crdNode) :
+                if (exchCtx.canMergeExchanges()) {
+                    if (cctx.kernalContext().clientNode() || CU.clientNode(discoEvt.eventNode()))
+                        exchange = onClientNodeEvent(crdNode);
+                    else
+                        exchange = ExchangeType.ALL_2;
+                }
+                else {
+                    exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
                 }
             }
@@ -556,6 +539,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         break;
                     }
 
+                    case ALL_2: {
+                        distributedExchange2();
+
+                        break;
+                    }
+
                     case CLIENT: {
                         initTopologies();
 
@@ -874,6 +863,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @throws IgniteCheckedException If failed.
      */
+    private void distributedExchange2() throws IgniteCheckedException {
+        if (crd.isLocal()) {
+            if (remaining.isEmpty())
+                onAllReceived();
+        }
+        else
+            sendPartitions(crd);
+
+        initDone();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     private void distributedExchange() throws IgniteCheckedException {
         assert crd != null;
 
@@ -914,11 +917,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         cctx.database().beforeExchange(this);
-//
-//        ExchangeDiscoveryEvents mergedEvts = null;
-//
-//        if (crd.isLocal())
-//            mergedEvts = cctx.exchange().coalesceExchanges(this);
 
         if (crd.isLocal()) {
             if (remaining.isEmpty())
@@ -1453,59 +1451,63 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     private GridDhtPartitionsExchangeFuture mergedWith;
 
     /** */
-    private List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs;
+    private GridDhtPartitionsSingleMessage pendingSingleMsg;
+
+    /** */
+    private Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingClientMsgs;
+
+    private void addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
+        if (mergedJoinExchMsgs == null)
+            mergedJoinExchMsgs = new LinkedHashMap<>();
+
+        if (msg != null)
+            mergedJoinExchMsgs.put(nodeId, msg);
+        else {
+            if (cctx.discovery().alive(nodeId))
+                awaitMergedMsgs++;
+            else
+                mergedJoinExchMsgs.put(nodeId, null);
+        }
+    }
 
     /**
      * @param fut Current exchange to merge with.
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
+    public void mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) {
         log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']');
 
-        List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs = null;
-
         synchronized (this) {
-            synchronized (fut) {
-                assert !isDone();
-                assert !initFut.isDone();
-                assert mergedWith == null;
+            assert !isDone();
+            assert !initFut.isDone();
+            assert mergedWith == null;
+            assert !CU.clientNode(discoEvt.eventNode()) : discoEvt;
 
-                mergedWith = fut;
+            mergedWith = fut;
 
-                if (this.pendingMsgs != null) {
-                    pendingMsgs = this.pendingMsgs;
+            fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSingleMsg);
 
-                    T2<UUID, GridDhtPartitionsSingleMessage> joinedSrvMsg = null;
+            // TODO 5578 client messages.
+        }
+    }
 
-                    if (discoEvt.type() == EVT_NODE_JOINED && !CU.clientNode(discoEvt.eventNode())) {
-                        for (Iterator<T2<UUID, GridDhtPartitionsSingleMessage>> it = pendingMsgs.iterator(); it.hasNext();) {
-                            T2<UUID, GridDhtPartitionsSingleMessage> msg = it.next();
+    void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+        boolean done = false;
 
-                            if (msg.get1().equals(discoEvt.eventNode().id())) {
-                                joinedSrvMsg = msg;
+        synchronized (this) {
+            if (mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id())) {
+                mergedJoinExchMsgs.put(node.id(), msg);
 
-                                it.remove();
+                assert awaitMergedMsgs > 0 : awaitMergedMsgs;
 
-                                break;
-                            }
-                        }
+                awaitMergedMsgs--;
 
-                        if (pendingMsgs.isEmpty())
-                            pendingMsgs = null;
-                    }
-                }
+                done = awaitMergedMsgs == 0;
             }
         }
 
-        if (pendingMsgs != null) {
-            final List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs0 = pendingMsgs;
+        if (done) {
 
-            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut0) {
-                    for (T2<UUID, GridDhtPartitionsSingleMessage> msg : pendingMsgs0)
-                        fut.processSingleMessage(msg.get1(), msg.get2());
-                }
-            });
         }
     }
 
@@ -1516,7 +1518,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         assert msg != null;
         assert exchId.equals(msg.exchangeId()) : msg;
-        assert msg.lastVersion() != null : msg;
 
         if (isDone()) {
             if (log.isDebugEnabled())
@@ -1537,15 +1538,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (mergedWith != null)
                     mergedWith0 = mergedWith;
                 else {
-                    if (pendingMsgs == null)
-                        pendingMsgs = new ArrayList<>();
-
-                    pendingMsgs.add(new T2<>(node.id(), msg));
+                    if (msg.client()) {
+                        assert false;
+                    }
+                    else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
+                        pendingSingleMsg = msg;
                 }
             }
 
             if (mergedWith0 != null) {
-                mergedWith0.onReceive(node, msg);
+                mergedWith0.onReceiveMerged(node, msg);
 
                 return;
             }
@@ -1857,6 +1859,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
+            if (exchCtx.canMergeExchanges()) {
+                cctx.exchange().mergeExchanges(this);
+
+                cctx.affinity().onTopologyChange(this, true);
+
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                        continue;
+
+                    grp.topology().beforeExchange(this, true);
+                }
+            }
+
             Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
@@ -2160,7 +2175,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 return;
             }
 
-            finishState = new FinishState(crd.id());
+            finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
+        }
+
+        if (exchCtx.canMergeExchanges()) {
+            try {
+                onServerNodeEvent(true);
+
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                        continue;
+
+                    grp.topology().beforeExchange(this, true);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                // TODO 5578.
+                U.error(log, "Failed: " + e, e);
+            }
         }
 
         Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
@@ -2568,8 +2600,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     enum ExchangeType {
         /** */
         CLIENT,
+
         /** */
         ALL,
+
+        /** */
+        ALL_2,
+
         /** */
         NONE
     }
@@ -2680,11 +2717,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         /** */
         private final UUID crdId;
 
+        /** */
+        private final AffinityTopologyVersion topVer;
+
         /**
          * @param crdId Coordinator node.
          */
-        FinishState(UUID crdId) {
+        FinishState(UUID crdId, AffinityTopologyVersion topVer) {
             this.crdId = crdId;
+            this.topVer = topVer;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 4c79c3b..8a5dbbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -154,6 +154,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.cachesAff = cachesAff;
     }
 
+    AffinityTopologyVersion resultTopologyVersion() {
+        return resTopVer;
+    }
+
     /**
      * @param cachesAff Affinity.
      * @return Message copy.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
deleted file mode 100644
index dbd3971..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class CacheExchangeCoalescingTest extends GridCommonAbstractTest {
-    /** */
-    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private boolean client;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testConcurrentJoin1() throws Exception {
-        IgniteEx srv0 = startGrid(0);
-
-        srv0.context().cache().context().exchange().coalesceTestWaitVersion(new AffinityTopologyVersion(3, 0));
-
-        final AtomicInteger idx = new AtomicInteger(1);
-
-        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                startGrid(idx.getAndIncrement());
-
-                return null;
-            }
-        }, 2, "start-node");
-
-        fut.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
new file mode 100644
index 0000000..ef8a1da
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMergeTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentJoin1() throws Exception {
+        IgniteEx srv0 = startGrid(0);
+
+        srv0.context().cache().context().exchange().mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0));
+
+        final AtomicInteger idx = new AtomicInteger(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        fut.get();
+    }
+}