You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/03 12:55:43 UTC

[3/4] ignite git commit: ignite-5578

ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3ecc169
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3ecc169
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3ecc169

Branch: refs/heads/ignite-5578
Commit: e3ecc169ceab543748c9ba3ef409540697cf2529
Parents: b1579ea
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 3 15:45:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 3 15:45:23 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 140 ++++++++-----------
 .../processors/cache/ExchangeContext.java       |   4 +-
 .../cache/ExchangeDiscoveryEvents.java          |   6 +-
 .../GridCachePartitionExchangeManager.java      |  26 ++--
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        | 139 +++++++++---------
 .../dht/preloader/InitNewCoordinatorFuture.java |   2 +-
 .../GridCacheDatabaseSharedManager.java         |   2 +-
 .../CacheLateAffinityAssignmentTest.java        |   2 +-
 10 files changed, 154 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index e6f5d6b..dc905a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -759,7 +759,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         evts.topologyVersion());
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
-                        if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
+                        if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                     }
                 }
@@ -957,7 +957,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 IgniteUuid deploymentId = desc.deploymentId();
 
                 if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) {
-                    aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
 
                     return;
                 }
@@ -991,7 +991,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
                 }
                 else
-                    aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
             }
         });
     }
@@ -1004,14 +1004,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+        boolean locJoin = fut.firstEvent().eventNode().isLocal();
 
         if (!locJoin) {
             forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     AffinityTopologyVersion topVer = fut.initialVersion();
 
-                    aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
+                    aff.clientEventTopologyChange(fut.firstEvent(), topVer);
                 }
             });
         }
@@ -1111,11 +1111,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.initialVersion(),
-                fut.discoveryEvent(),
-                fut.discoCache());
-
-            grpHolder.affinity().initialize(fut.initialVersion(), newAff);
+            calculateAndInit(fut.events(), grpHolder.affinity(), fut.initialVersion());
         }
         else if (grpHolder.client() && grp != null) {
             assert grpHolder.affinity().idealAssignment() != null;
@@ -1149,12 +1145,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                     CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
-                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
-                        List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
-
-                        cache.affinity().initialize(fut.initialVersion(), assignment);
-                    }
+                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE))
+                        calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion());
                 }
             });
         }
@@ -1182,13 +1174,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         ExchangeDiscoveryEvents evts = fut.context().events();
 
-        if (canCalculateAffinity(desc, aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(evts.topologyVersion(),
-                evts.lastEvent(),
-                evts.discoveryCache());
-
-            aff.initialize(evts.topologyVersion(), assignment);
-        }
+        if (canCalculateAffinity(desc, aff, fut))
+            calculateAndInit(evts, aff, evts.topologyVersion());
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                 desc.groupId(),
@@ -1220,7 +1207,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return true;
 
         // If local node did not initiate exchange or local node is the only cache node in grid.
-        Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId());
+        Collection<ClusterNode> affNodes = fut.events().discoveryCache().cacheGroupAffinityNodes(aff.groupId());
 
         return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
@@ -1330,13 +1317,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     aff.initialize(evts.topologyVersion(), assignments);
                 }
-                else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom())) {
-                    List<List<ClusterNode>> assignment = aff.calculate(evts.topologyVersion(),
-                        evts.lastEvent(),
-                        evts.discoveryCache());
-
-                    aff.initialize(evts.topologyVersion(), assignment);
-                }
+                else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom()))
+                    calculateAndInit(evts, aff, evts.topologyVersion());
 
                 grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
             }
@@ -1412,9 +1394,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        assert !fut.discoveryEvent().eventNode().isClient();
+        assert !fut.firstEvent().eventNode().isClient();
 
-        boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
+        boolean locJoin = fut.firstEvent().eventNode().isLocal();
 
         WaitRebalanceInfo waitRebalanceInfo = null;
 
@@ -1424,13 +1406,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                         AffinityTopologyVersion topVer = fut.initialVersion();
 
-                        CacheGroupHolder cache = groupHolder(topVer, desc);
-
-                        List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
-                            fut.discoveryEvent(),
-                            fut.discoCache());
+                        CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                        cache.affinity().initialize(topVer, newAff);
+                        calculateAndInit(fut.events(), grpHolder.affinity(), topVer);
                     }
                 });
             }
@@ -1485,6 +1463,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param evts Discovery events.
+     * @param aff Affinity.
+     * @param topVer Topology version.
+     */
+    private void calculateAndInit(ExchangeDiscoveryEvents evts,
+        GridAffinityAssignmentCache aff,
+        AffinityTopologyVersion topVer)
+    {
+        List<List<ClusterNode>> assignment = aff.calculate(topVer,
+            evts.lastEvent(),
+            evts.discoveryCache());
+
+        aff.initialize(topVer, assignment);
+    }
+
+    /**
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
@@ -1498,14 +1492,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 continue;
 
             if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
-                // Do not calculate affinity since it can change in case of exchange merge.
-                if (!fut.context().mergeExchanges()) {
-                    List<List<ClusterNode>> assignment = grp.affinity().calculate(topVer,
-                            fut.discoveryEvent(),
-                            fut.discoCache());
-
-                    grp.affinity().initialize(topVer, assignment);
-                }
+                // In case if merge is allowed do not calculate affinity since it can change on exchange end.
+                if (!fut.context().mergeExchanges())
+                    calculateAndInit(fut.events(), grp.affinity(), topVer);
             }
             else {
                 if (fut.context().fetchAffinityOnJoin()) {
@@ -1516,22 +1505,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                         grpDesc.groupId(),
                         topVer,
-                        fut.discoCache());
+                        fut.events().discoveryCache());
 
                     fetchFut.init(false);
 
                     fetchFuts.add(fetchFut);
                 }
                 else {
-                    if (fut.discoCache().serverNodes().size() > 0)
+                    if (fut.events().discoveryCache().serverNodes().size() > 0)
                         fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
-                    else {
-                        List<List<ClusterNode>> aff = grp.affinity().calculate(topVer,
-                            fut.discoveryEvent(),
-                            fut.discoCache());
-
-                        grp.affinity().initialize(topVer, aff);
-                    }
+                    else
+                        calculateAndInit(fut.events(), grp.affinity(), topVer);
                 }
             }
         }
@@ -1542,8 +1526,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             int grpId = fetchFut.groupId();
 
             fetchAffinity(topVer,
-                fut.discoveryEvent(),
-                fut.discoCache(),
+                fut.events().lastEvent(),
+                fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
                 fetchFut);
         }
@@ -1603,7 +1587,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return {@code True} if affinity should be assigned by coordinator.
      */
     public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
-        ClusterNode leftNode = fut.discoveryEvent().eventNode();
+        ClusterNode leftNode = fut.firstEvent().eventNode();
 
         assert !leftNode.isClient() : leftNode;
 
@@ -1613,14 +1597,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                     CacheGroupHolder cache = groupHolder(fut.initialVersion(), desc);
 
-                    cache.aff.calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+                    cache.aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
                 }
             });
         }
         else {
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                    aff.calculate(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+                    aff.calculate(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
                 }
             });
         }
@@ -1670,12 +1654,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
                     if (newAff) {
-                        if (!aff.lastVersion().equals(topVer)) {
-                            List<List<ClusterNode>> assign =
-                                aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
-
-                            aff.initialize(topVer, assign);
-                        }
+                        if (!aff.lastVersion().equals(topVer))
+                            calculateAndInit(fut.events(), aff, topVer);
 
                         grpHolder.topology().beforeExchange(fut, true, false);
                     }
@@ -1700,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                             desc.groupId(),
                             prev.topologyVersion(),
-                            prev.discoCache());
+                            prev.events().discoveryCache());
 
                         fetchFut.init(false);
 
@@ -1710,12 +1690,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                                 throws IgniteCheckedException {
                                 fetchAffinity(prev.topologyVersion(),
-                                    prev.discoveryEvent(),
-                                    prev.discoCache(),
+                                    prev.events().lastEvent(),
+                                    prev.events().discoveryCache(),
                                     aff,
                                     (GridDhtAssignmentFetchFuture)fetchFut);
 
-                                aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                                aff.calculate(topVer, fut.events().lastEvent(), fut.events().discoveryCache());
 
                                 affFut.onDone(topVer);
                             }
@@ -1730,13 +1710,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     if (newAff) {
                         GridAffinityAssignmentCache aff = grpHolder.affinity();
 
-                        if (!aff.lastVersion().equals(topVer)) {
-                            List<List<ClusterNode>> assign = aff.calculate(topVer,
-                                fut.discoveryEvent(),
-                                fut.discoCache());
-
-                            aff.initialize(topVer, assign);
-                        }
+                        if (!aff.lastVersion().equals(topVer))
+                            calculateAndInit(fut.events(), aff, topVer);
 
                         grpHolder.topology().beforeExchange(fut, true, false);
                     }
@@ -1908,13 +1883,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException
     {
         if (addedOnExchnage) {
-            if (!aff.lastVersion().equals(evts.topologyVersion())) {
-                List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(),
-                    evts.lastEvent(),
-                    evts.discoveryCache());
-
-                aff.initialize(evts.topologyVersion(), newAff);
-            }
+            if (!aff.lastVersion().equals(evts.topologyVersion()))
+                calculateAndInit(evts, aff, evts.topologyVersion());
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 94040ba..6442752 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -56,7 +56,7 @@ public class ExchangeContext {
      * @param fut Exchange future.
      */
     public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
-        int protocolVer = exchangeProtocolVersion(fut.discoCache().minimumNodeVersion());
+        int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
             fetchAffOnJoin = true;
@@ -71,7 +71,7 @@ public class ExchangeContext {
 
             merge = !startCaches &&
                 protocolVer > 1 &&
-                fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
+                fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
         }
 
         evts = new ExchangeDiscoveryEvents(fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index ff61701..d4fbe60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -71,7 +71,7 @@ public class ExchangeDiscoveryEvents {
      * @param fut Current exchange future.
      */
     ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
-        addEvent(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
+        addEvent(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache());
     }
 
     /**
@@ -106,7 +106,7 @@ public class ExchangeDiscoveryEvents {
      * @return Last server join/fail event version.
      */
     AffinityTopologyVersion lastServerEventVersion() {
-        assert srvEvtTopVer != null;
+        assert srvEvtTopVer != null : this;
 
         return srvEvtTopVer;
     }
@@ -169,7 +169,7 @@ public class ExchangeDiscoveryEvents {
     /**
      * @return Last event.
      */
-    DiscoveryEvent lastEvent() {
+    public DiscoveryEvent lastEvent() {
         return lastSrvEvt != null ? lastSrvEvt : lastEvt;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6efd19e..75ddacd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1796,15 +1796,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() +
                     ", mergedFut=" + fut.initialVersion() +
-                    ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
-                    ", evtNode=" + fut.discoveryEvent().eventNode().id()+
-                    ", evtNodeClient=" + CU.clientNode(fut.discoveryEvent().eventNode())+ ']');
+                    ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                    ", evtNode=" + fut.firstEvent().eventNode().id()+
+                    ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
 
-                DiscoveryEvent evt = fut.discoveryEvent();
+                DiscoveryEvent evt = fut.firstEvent();
 
                 curFut.context().events().addEvent(fut.initialVersion(),
-                    fut.discoveryEvent(),
-                    fut.discoCache());
+                    fut.firstEvent(),
+                    fut.firstEventCache());
 
                 if (evt.type() == EVT_NODE_JOINED) {
                     final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut);
@@ -1888,7 +1888,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (task instanceof GridDhtPartitionsExchangeFuture) {
                     GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
 
-                    DiscoveryEvent evt = fut.discoveryEvent();
+                    DiscoveryEvent evt = fut.firstEvent();
 
                     if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                         log.info("Stop merge, custom event found: " + evt);
@@ -1911,13 +1911,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     log.info("Merge exchange future [curFut=" + curFut.initialVersion() +
                         ", mergedFut=" + fut.initialVersion() +
-                        ", evt=" + IgniteUtils.gridEventName(fut.discoveryEvent().type()) +
-                        ", evtNode=" + fut.discoveryEvent().eventNode().id() +
-                        ", evtNodeClient=" + CU.clientNode(fut.discoveryEvent().eventNode())+ ']');
+                        ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) +
+                        ", evtNode=" + fut.firstEvent().eventNode().id() +
+                        ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']');
 
                     curFut.context().events().addEvent(fut.initialVersion(),
-                        fut.discoveryEvent(),
-                        fut.discoCache());
+                        fut.firstEvent(),
+                        fut.firstEventCache());
 
                     if (evt.type() == EVT_NODE_JOINED) {
                         if (fut.mergeJoinExchange(curFut))
@@ -2214,7 +2214,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             boolean newCrd = false;
 
                             if (!crd) {
-                                List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes();
+                                List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
 
                                 crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5a16bc9..12f3fb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -258,7 +258,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (stopping)
                 return;
 
-            discoCache = exchFut.discoCache();
+            discoCache = exchFut.events().discoveryCache();
 
             beforeExchange0(loc, exchFut);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index e99ad2c..5e58502 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -518,7 +518,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                     if (grp.affinityNode()) {
                         if (grpStarted ||
-                            exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
+                            exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
                             exchFut.serverNodeDiscoveryEvent()) {
                             if (affReady) {
                                 assert grp.affinity().lastVersion().equals(evts.topologyVersion());
@@ -626,7 +626,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 changed = true;
 
                                 if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                                    DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+                                    DiscoveryEvent discoEvt = exchFut.events().lastEvent();
 
                                     grp.addRebalanceEvent(p,
                                         EVT_CACHE_REBALANCE_PART_DATA_LOST,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index be6acf4..3d7749b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -125,10 +125,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /** */
     @GridToStringExclude
-    private volatile DiscoCache discoCache;
+    private volatile DiscoCache firstEvtDiscoCache;
 
-    /** Discovery event. */
-    private volatile DiscoveryEvent discoEvt;
+    /** Discovery event triggered this exchange. */
+    private volatile DiscoveryEvent firstDiscoEvt;
 
     /** */
     @GridToStringExclude
@@ -174,7 +174,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
 
-    /** */
+    /**
+     * Message received from node joining cluster, needed if this join exchange
+     * is merged with previous one.
+     */
     @GridToStringExclude
     private GridDhtPartitionsSingleMessage pendingJoinMsg;
 
@@ -222,11 +225,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>();
 
-    /** */
+    /** Single messages from merged 'node join' exchanges. */
     @GridToStringExclude
     private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
 
-    /** */
+    /** Number of awaited messages for merged 'node join' exchanges. */
     @GridToStringExclude
     private int awaitMergedMsgs;
 
@@ -255,7 +258,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private FinishState finishState;
 
-    /** */
+    /** Initialized when node becomes new coordinator. */
     @GridToStringExclude
     private InitNewCoordinatorFuture newCrdFut;
 
@@ -371,15 +374,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Discovery cache.
-     *
-     * TODO 5578 review usages, rename initialDiscoveryEvent
-     */
-    public DiscoCache discoCache() {
-        return discoCache;
-    }
-
-    /**
      * @param cacheId Cache ID.
      * @param rcvdFrom Node ID cache was received from.
      * @return {@code True} if cache was added during this exchange.
@@ -431,8 +425,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         assert exchId.equals(this.exchId);
 
         this.exchId.discoveryEvent(discoEvt);
-        this.discoEvt = discoEvt;
-        this.discoCache = discoCache;
+        this.firstDiscoEvt= discoEvt;
+        this.firstEvtDiscoCache = discoCache;
 
         evtLatch.countDown();
     }
@@ -459,12 +453,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return Discovery event.
+     * @return First event discovery event.
      *
-     * TODO 5578 review usages, rename initialDiscoveryEvent
      */
-    public DiscoveryEvent discoveryEvent() {
-        return discoEvt;
+    public DiscoveryEvent firstEvent() {
+        return firstDiscoEvt;
+    }
+
+    /**
+     * @return Discovery cache for first event.
+     */
+    public DiscoCache firstEventCache() {
+        return firstEvtDiscoCache;
+    }
+
+    /**
+     * @return Events processed in this exchange.
+     */
+    public ExchangeDiscoveryEvents events() {
+        return exchCtx.events();
     }
 
     /**
@@ -525,13 +532,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         U.await(evtLatch);
 
-        assert discoEvt != null : this;
-        assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+        assert firstDiscoEvt != null : this;
+        assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this;
 
         try {
             AffinityTopologyVersion topVer = initialVersion();
 
-            srvNodes = new ArrayList<>(discoCache.serverNodes());
+            srvNodes = new ArrayList<>(firstEvtDiscoCache.serverNodes());
 
             remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
 
@@ -551,18 +558,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (exchLog.isInfoEnabled()) {
                 exchLog.info("Started exchange init [topVer=" + topVer +
                     ", crd=" + crdNode +
-                    ", evt=" + IgniteUtils.gridEventName(discoEvt.type()) +
-                    ", evtNode=" + discoEvt.eventNode().id() +
-                    ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent) discoEvt).customMessage() : null) +
+                    ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) +
+                    ", evtNode=" + firstDiscoEvt.eventNode().id() +
+                    ", customEvt=" + (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)firstDiscoEvt).customMessage() : null) +
                     ", allowMerge=" + exchCtx.mergeExchanges() + ']');
             }
 
             ExchangeType exchange;
 
-            if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+            if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert !exchCtx.mergeExchanges();
 
-                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
 
                 if (msg instanceof ChangeGlobalStateMessage) {
                     assert exchActions != null && !exchActions.empty();
@@ -575,7 +582,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     exchange = onCacheChangeRequest(crdNode);
                 }
                 else if (msg instanceof SnapshotDiscoveryMessage) {
-                    exchange = CU.clientNode(discoEvt.eventNode()) ?
+                    exchange = CU.clientNode(firstDiscoEvt.eventNode()) ?
                         onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
                 }
@@ -588,10 +595,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 initCoordinatorCaches(newCrd);
             }
             else {
-                if (discoEvt.type() == EVT_NODE_JOINED) {
-                    if (!discoEvt.eventNode().isLocal()) {
+                if (firstDiscoEvt.type() == EVT_NODE_JOINED) {
+                    if (!firstDiscoEvt.eventNode().isLocal()) {
                         Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
-                            discoEvt.eventNode().id(),
+                            firstDiscoEvt.eventNode().id(),
                             topVer);
 
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
@@ -616,7 +623,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         }
                     }
                     else {
-                        if (CU.clientNode(discoEvt.eventNode()))
+                        if (CU.clientNode(firstDiscoEvt.eventNode()))
                             exchange = onClientNodeEvent(crdNode);
                         else
                             exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
@@ -626,7 +633,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         onLeft();
                 }
                 else {
-                    exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
+                    exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
                 }
             }
@@ -761,13 +768,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             top.updateTopologyVersion(
                 this,
-                discoCache(),
+                events().discoveryCache(),
                 updSeq,
                 cacheGroupStopping(grp.groupId()));
         }
 
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(this, discoCache(), -1, cacheGroupStopping(top.groupId()));
+            top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId()));
     }
 
     /**
@@ -892,19 +899,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
-        assert CU.clientNode(discoEvt.eventNode()) : this;
+        assert CU.clientNode(firstDiscoEvt.eventNode()) : this;
 
-        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+        if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
             onLeft();
 
-            assert !discoEvt.eventNode().isLocal() : discoEvt;
+            assert !firstDiscoEvt.eventNode().isLocal() : firstDiscoEvt;
         }
         else
-            assert discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+            assert firstDiscoEvt.type() == EVT_NODE_JOINED || firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : firstDiscoEvt;
 
         cctx.affinity().onClientEvent(this, crd);
 
-        return discoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
+        return firstDiscoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
     }
 
     /**
@@ -913,9 +920,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return Exchange type.
      */
     private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
-        assert !CU.clientNode(discoEvt.eventNode()) : this;
+        assert !CU.clientNode(firstDiscoEvt.eventNode()) : this;
 
-        if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) {
+        if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
             onLeft();
 
             exchCtx.events().warnNoAffinityNodes(cctx);
@@ -979,7 +986,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         waitPartitionRelease();
 
-        boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
+        boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
@@ -1021,7 +1028,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
+            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(events().lastEvent());
 
             if (fut != null) {
                 fut.get();
@@ -1190,7 +1197,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return {@code True} if exchange for local node join.
      */
     public boolean localJoinExchange() {
-        return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal();
+        return firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isLocal();
     }
 
     /**
@@ -1423,7 +1430,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups())
-                m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes()));
+                m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));
 
             grpValidRes = m;
         }
@@ -1483,8 +1490,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             exchActions = null;
 
-            if (discoEvt instanceof DiscoveryCustomEvent)
-                ((DiscoveryCustomEvent)discoEvt).customMessage(null);
+            if (firstDiscoEvt instanceof DiscoveryCustomEvent)
+                ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
 
             if (err == null)
                 cctx.exchange().lastFinishedFuture(this);
@@ -1528,9 +1535,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Records that this exchange if merged with another 'node join' exchange.
+     *
      * @param node Joined node.
      * @param msg Joined node message if already received.
-     * @return {@code True} if need wait for message from joined server node.
+     * @return {@code True} if need to wait for message from joined server node.
      */
     private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) {
         assert Thread.holdsLock(mux);
@@ -1582,6 +1591,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Merges this exchange with given one.
+     *
      * @param fut Current exchange to merge with.
      * @return {@code True} if need wait for message from joined server node.
      */
@@ -1596,7 +1607,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             mergedWith = fut;
 
-            ClusterNode joinedNode = discoEvt.eventNode();
+            ClusterNode joinedNode = firstDiscoEvt.eventNode();
 
             wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg);
         }
@@ -1784,7 +1795,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (finishState0 == null) {
-                    assert discoEvt.type() == EVT_NODE_JOINED && CU.clientNode(discoEvt.eventNode()) : this;
+                    assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this;
 
                     finishState0 = new FinishState(cctx.localNodeId(),
                         initialVersion(),
@@ -2093,7 +2104,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal()) {
-                    boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, discoEvt);
+                    boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, events().lastEvent());
 
                     detected |= detectedOnGrp;
                 }
@@ -2130,7 +2141,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param sndResNodes Additional nodes to send finish message to.
      */
     private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
         try {
@@ -2138,7 +2149,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             assert partHistSuppliers.isEmpty() : partHistSuppliers;
 
-            if (!crd.equals(discoCache.serverNodes().get(0)) && !exchCtx.mergeExchanges()) {
+            if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
                         grp.topology().beforeExchange(this, !centralizedAff, false);
@@ -2165,7 +2176,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param sndResNodes Additional nodes to send finish message to.
      */
     private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
         try {
@@ -2239,13 +2250,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-                assert discoEvt instanceof DiscoveryCustomEvent;
+            if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
                 if (activateCluster())
                     assignPartitionsStates();
 
-                if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
+                if (((DiscoveryCustomEvent)firstDiscoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
                     if (exchActions != null) {
                         Set<String> caches = exchActions.cachesToResetLostPartitions();
 
@@ -2906,7 +2917,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                         ClusterNode crd0;
 
-                        discoCache.updateAlives(node);
+                        events().discoveryCache().updateAlives(node);
 
                         InitNewCoordinatorFuture newCrdFut0;
 
@@ -3269,8 +3280,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     public String shortInfo() {
         return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
-            ", evt=" + (discoEvt != null ? IgniteUtils.gridEventName(discoEvt.type()) : -1) +
-            ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) +
+            ", evt=" + (firstDiscoEvt != null ? IgniteUtils.gridEventName(firstDiscoEvt.type()) : -1) +
+            ", evtNode=" + (firstDiscoEvt != null ? firstDiscoEvt.eventNode() : null) +
             ", done=" + isDone() + ']';
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index 71db8ad..42ce9b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -102,7 +102,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture {
         if (restoreState) {
             DiscoCache curDiscoCache = cctx.discovery().discoCache();
 
-            DiscoCache discoCache = exchFut.discoCache();
+            DiscoCache discoCache = exchFut.events().discoveryCache();
 
             List<ClusterNode> nodes = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 7713caa5..0178bb6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -721,7 +721,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
     /** {@inheritDoc} */
     @Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        DiscoveryEvent discoEvt = fut.discoveryEvent();
+        DiscoveryEvent discoEvt = fut.firstEvent();
 
         boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3ecc169/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 0d106a4..695d8a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2809,7 +2809,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
                 GridDhtPartitionsExchangeFuture fut = futs.get(i);
 
                 if (fut.initialVersion().equals(topVer0)) {
-                    evt = fut.discoveryEvent();
+                    evt = fut.firstEvent();
 
                     break;
                 }