You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 07:22:00 UTC

[22/50] [abbrv] ignite git commit: ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange)

ignite-4779 Missed discovery data snapshot during exchange processing (do not use discovery manager cache to handle exchange)

(cherry picked from commit a61a98ad3908770b77d0ffb071effbc92f4d5c5a)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42346673
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42346673
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42346673

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 423466736904b55a6d4397aa6268fff5a9ee5ea9
Parents: 7e956ed
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Thu Mar 9 21:01:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 9 22:14:18 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 310 +++++++++
 .../discovery/GridDiscoveryManager.java         | 680 +++++--------------
 .../eventstorage/DiscoveryEventListener.java    |  33 +
 .../eventstorage/GridEventStorageManager.java   | 162 ++++-
 .../affinity/GridAffinityAssignmentCache.java   |   7 +-
 .../cache/CacheAffinitySharedManager.java       |  35 +-
 .../cache/GridCacheAffinityManager.java         |   3 +-
 .../GridCachePartitionExchangeManager.java      |  73 +-
 .../dht/GridClientPartitionTopology.java        |  20 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   7 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  40 +-
 .../GridDhtPartitionsExchangeFuture.java        |  30 +-
 .../service/GridServiceProcessor.java           |  21 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |  14 -
 14 files changed, 808 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
new file mode 100644
index 0000000..5247ac1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class DiscoCache {
+    /** Local node. */
+    private final ClusterNode loc;
+
+    /** Remote nodes. */
+    private final List<ClusterNode> rmtNodes;
+
+    /** All nodes. */
+    private final List<ClusterNode> allNodes;
+
+    /** All server nodes. */
+    private final List<ClusterNode> srvNodes;
+
+    /** Daemon nodes. */
+    private final List<ClusterNode> daemonNodes;
+
+    /** All server nodes. */
+    private final List<ClusterNode> srvNodesWithCaches;
+
+    /** All nodes with at least one cache configured. */
+    @GridToStringInclude
+    private final List<ClusterNode> allNodesWithCaches;
+
+    /** All remote nodes with at least one cache configured. */
+    @GridToStringInclude
+    private final List<ClusterNode> rmtNodesWithCaches;
+
+    /** Cache nodes by cache name. */
+    @GridToStringInclude
+    private final Map<Integer, List<ClusterNode>> allCacheNodes;
+
+    /** Affinity cache nodes by cache name. */
+    @GridToStringInclude
+    private final Map<Integer, List<ClusterNode>> affCacheNodes;
+
+    /** Node map. */
+    private final Map<UUID, ClusterNode> nodeMap;
+
+    /** Caches where at least one node has near cache enabled. */
+    @GridToStringInclude
+    private final Set<Integer> nearEnabledCaches;
+
+    /** Alive nodes. */
+    private final Set<UUID> alives = new GridConcurrentHashSet<>();
+
+    /**
+     * @param loc Local node.
+     * @param rmtNodes Remote nodes.
+     * @param allNodes All nodes.
+     * @param srvNodes Server nodes.
+     * @param daemonNodes Daemon nodes.
+     * @param srvNodesWithCaches Server nodes with at least one cache configured.
+     * @param allNodesWithCaches All nodes with at least one cache configured.
+     * @param rmtNodesWithCaches Remote nodes with at least one cache configured.
+     * @param allCacheNodes Cache nodes by cache name.
+     * @param affCacheNodes Affinity cache nodes by cache name.
+     * @param nodeMap Node map.
+     * @param nearEnabledCaches Caches where at least one node has near cache enabled.
+     * @param alives Alive nodes.
+     */
+    DiscoCache(ClusterNode loc,
+        List<ClusterNode> rmtNodes,
+        List<ClusterNode> allNodes,
+        List<ClusterNode> srvNodes,
+        List<ClusterNode> daemonNodes,
+        List<ClusterNode> srvNodesWithCaches,
+        List<ClusterNode> allNodesWithCaches,
+        List<ClusterNode> rmtNodesWithCaches,
+        Map<Integer, List<ClusterNode>> allCacheNodes,
+        Map<Integer, List<ClusterNode>> affCacheNodes,
+        Map<UUID, ClusterNode> nodeMap,
+        Set<Integer> nearEnabledCaches,
+        Set<UUID> alives) {
+        this.loc = loc;
+        this.rmtNodes = rmtNodes;
+        this.allNodes = allNodes;
+        this.srvNodes = srvNodes;
+        this.daemonNodes = daemonNodes;
+        this.srvNodesWithCaches = srvNodesWithCaches;
+        this.allNodesWithCaches = allNodesWithCaches;
+        this.rmtNodesWithCaches = rmtNodesWithCaches;
+        this.allCacheNodes = allCacheNodes;
+        this.affCacheNodes = affCacheNodes;
+        this.nodeMap = nodeMap;
+        this.nearEnabledCaches = nearEnabledCaches;
+        this.alives.addAll(alives);
+    }
+
+    /** @return Local node. */
+    public ClusterNode localNode() {
+        return loc;
+    }
+
+    /** @return Remote nodes. */
+    public List<ClusterNode> remoteNodes() {
+        return rmtNodes;
+    }
+
+    /** @return All nodes. */
+    public List<ClusterNode> allNodes() {
+        return allNodes;
+    }
+
+    /** @return Server nodes. */
+    public List<ClusterNode> serverNodes() {
+        return srvNodes;
+    }
+
+    /** @return Daemon nodes. */
+    public List<ClusterNode> daemonNodes() {
+        return daemonNodes;
+    }
+
+    /** @return Server nodes with at least one cache configured. */
+    public List<ClusterNode> serverNodesWithCaches() {
+        return srvNodesWithCaches;
+    }
+
+    /**
+     * Gets all remote nodes that have at least one cache configured.
+     *
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> remoteNodesWithCaches() {
+        return rmtNodesWithCaches;
+    }
+
+    /**
+     * Gets collection of nodes with at least one cache configured.
+     *
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> allNodesWithCaches() {
+        return allNodesWithCaches;
+    }
+
+    /**
+     * Gets collection of server nodes with at least one cache configured.
+     *
+     * @return Collection of nodes.
+     */
+    public Collection<ClusterNode> aliveServerNodes() {
+        return F.view(serverNodes(), new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return alives.contains(node.id());
+            }
+        });
+    }
+
+    /**
+     * Gets collection of server nodes with at least one cache configured.
+     *
+     * @return Collection of nodes.
+     */
+    public Collection<ClusterNode> aliveServerNodesWithCaches() {
+        return F.view(serverNodesWithCaches(), new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return alives.contains(node.id());
+            }
+        });
+    }
+
+    /**
+     * @return Oldest alive server node.
+     */
+    public @Nullable ClusterNode oldestAliveServerNode(){
+        Iterator<ClusterNode> it = aliveServerNodes().iterator();
+        return it.hasNext() ? it.next() : null;
+    }
+
+    /**
+     * @return Oldest alive server node with at least one cache configured.
+     */
+    public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
+        Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
+        return it.hasNext() ? it.next() : null;
+    }
+
+    /**
+     * Gets all nodes that have cache with given name.
+     *
+     * @param cacheName Cache name.
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> cacheNodes(@Nullable String cacheName) {
+        return cacheNodes(CU.cacheId(cacheName));
+    }
+
+    /**
+     * Gets all nodes that have cache with given ID.
+     *
+     * @param cacheId Cache ID.
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> cacheNodes(Integer cacheId) {
+        return emptyIfNull(allCacheNodes.get(cacheId));
+    }
+
+    /**
+     * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+     * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
+     *
+     * @param cacheName Cache name.
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) {
+        return cacheAffinityNodes(CU.cacheId(cacheName));
+    }
+
+    /**
+     * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+     * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
+     *
+     * @param cacheId Cache ID.
+     * @return Collection of nodes.
+     */
+    public List<ClusterNode> cacheAffinityNodes(int cacheId) {
+        return emptyIfNull(affCacheNodes.get(cacheId));
+    }
+
+    /**
+     * Checks if cache with given ID has at least one node with near cache enabled.
+     *
+     * @param cacheId Cache ID.
+     * @return {@code True} if cache with given name has at least one node with near cache enabled.
+     */
+    public boolean hasNearCache(int cacheId) {
+        return nearEnabledCaches.contains(cacheId);
+    }
+
+    /**
+     * @param id Node ID.
+     * @return Node.
+     */
+    public @Nullable ClusterNode node(UUID id) {
+        return nodeMap.get(id);
+    }
+
+    /**
+     * Removes left node from alives lists.
+     *
+     * @param rmvd Removed node.
+     */
+    public void updateAlives(ClusterNode rmvd) {
+        alives.remove(rmvd.id());
+    }
+
+    /**
+     * Removes left nodes from cached alives lists.
+     *
+     * @param discovery Discovery manager.
+     */
+    public void updateAlives(GridDiscoveryManager discovery) {
+        for (UUID alive : alives) {
+            if (!discovery.alive(alive))
+                alives.remove(alive);
+        }
+    }
+
+    /**
+     * @param nodes Cluster nodes.
+     * @return Empty collection if nodes list is {@code null}
+     */
+    private List<ClusterNode> emptyIfNull(List<ClusterNode> nodes) {
+        return nodes == null ? Collections.<ClusterNode>emptyList() : nodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoCache.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 218aff0..960a064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -35,14 +35,12 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -79,19 +77,17 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridTuple5;
+import org.apache.ignite.internal.util.lang.GridTuple6;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -100,7 +96,6 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
@@ -114,6 +109,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -235,7 +231,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private long segChkFreq;
 
     /** Local node join to topology event. */
-    private GridFutureAdapter<DiscoveryEvent> locJoinEvt = new GridFutureAdapter<>();
+    private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>();
 
     /** GC CPU load. */
     private volatile double gcCpuLoad;
@@ -560,20 +556,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
+                final DiscoCache discoCache;
+
                 // Put topology snapshot into discovery history.
                 // There is no race possible between history maintenance and concurrent discovery
                 // event notifications, since SPI notifies manager about all events from this listener.
                 if (verChanged) {
-                    DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id())));
+                    discoCache = createDiscoCache(locNode, topSnapshot);
 
-                    discoCacheHist.put(nextTopVer, cache);
+                    discoCacheHist.put(nextTopVer, discoCache);
 
-                    boolean set = updateTopologyVersionIfGreater(nextTopVer, cache);
+                    boolean set = updateTopologyVersionIfGreater(nextTopVer, discoCache);
 
                     assert set || topVer == 0 : "Topology version has not been updated [this.topVer=" +
                         topSnap + ", topVer=" + topVer + ", node=" + node +
                         ", evt=" + U.gridEventName(type) + ']';
                 }
+                else
+                    // Current version.
+                    discoCache = discoCache();
 
                 // If this is a local join event, just save it and do not notify listeners.
                 if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
@@ -581,7 +582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         gridStartTime = getSpi().getGridStartTime();
 
                     updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
-                        new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id()))));
+                        discoCache);
 
                     startLatch.countDown();
 
@@ -591,14 +592,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     discoEvt.eventNode(node);
                     discoEvt.type(EVT_NODE_JOINED);
 
-                    discoEvt.topologySnapshot(topVer, new ArrayList<>(
-                        F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() {
-                            @Override public ClusterNode apply(ClusterNode e) {
-                                return e;
-                            }
-                        }, FILTER_DAEMON)));
+                    discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON)));
 
-                    locJoinEvt.onDone(discoEvt);
+                    locJoin.onDone(new T2<>(discoEvt, discoCache));
 
                     return;
                 }
@@ -613,7 +609,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onDisconnected();
 
-                    locJoinEvt = new GridFutureAdapter<>();
+                    locJoin = new GridFutureAdapter<>();
 
                     registeredCaches.clear();
 
@@ -626,7 +622,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     topHist.clear();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
-                        new DiscoCache(locNode, Collections.<ClusterNode>emptySet())));
+                        createDiscoCache(locNode, Collections.<ClusterNode>emptySet())));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
                     assert locNode.isClient() : locNode;
@@ -643,7 +639,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                             try {
                                 fut.get();
 
-                                discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null);
+                                discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null);
                             }
                             catch (IgniteException ignore) {
                                 // No-op.
@@ -655,7 +651,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 }
 
                 if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
-                    discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg);
+                    discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg);
             }
         });
 
@@ -1372,8 +1368,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             U.join(segChkThread, log);
         }
 
-        if (!locJoinEvt.isDone())
-            locJoinEvt.onDone(
+        if (!locJoin.isDone())
+            locJoin.onDone(
                 new IgniteCheckedException("Failed to wait for local node joined event (grid is stopping)."));
     }
 
@@ -1567,7 +1563,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @return Discovery collection cache.
      */
-    private DiscoCache discoCache() {
+    public DiscoCache discoCache() {
         Snapshot cur = topSnap.get();
 
         assert cur != null;
@@ -1620,14 +1616,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @param topVer Topology version.
-     * @return All server nodes for given topology version.
-     */
-    public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).srvNodes;
-    }
-
-    /**
      * Gets node from history for given topology version.
      *
      * @param topVer Topology version.
@@ -1646,7 +1634,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion());
+        return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName);
     }
 
     /**
@@ -1656,7 +1644,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion());
+        return resolveDiscoCache(null, topVer).allNodesWithCaches();
     }
 
     /**
@@ -1666,29 +1654,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion());
-    }
-
-    /**
-     * Gets cache nodes for cache with given name.
-     *
-     * @param cacheName Cache name.
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
-    }
-
-    /**
-     * Gets cache remote nodes for cache with given name.
-     *
-     * @param cacheName Cache name.
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
+        return resolveDiscoCache(null, topVer).remoteNodesWithCaches();
     }
 
     /**
@@ -1696,11 +1662,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Oldest alive server nodes with at least one cache configured.
      */
     @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
-        DiscoCache cache = resolveDiscoCache(null, topVer);
-
-        Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
-
-        return e != null ? e.getKey() : null;
+        return resolveDiscoCache(null, topVer).oldestAliveServerNodeWithCache();
     }
 
     /**
@@ -1711,7 +1673,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache affinity nodes.
      */
     public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion());
+        return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName);
     }
 
     /**
@@ -1788,7 +1750,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if cache with given name has at least one node with near cache enabled.
      */
     public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName);
+        return resolveDiscoCache(cacheName, topVer).hasNearCache(CU.cacheId(cacheName));
     }
 
     /**
@@ -1874,7 +1836,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** @return Event that represents a local node joined to topology. */
     public DiscoveryEvent localJoinEvent() {
         try {
-            return locJoinEvt.get();
+            return locJoin.get().get1();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @return Tuple that consists of a local join event and discovery cache at the join time.
+     */
+    public T2<DiscoveryEvent, DiscoCache> localJoin() {
+        try {
+            return locJoin.get();
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -1948,6 +1922,114 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param loc Local node.
+     * @param topSnapshot Topology snapshot.
+     * @return Newly created discovery cache.
+     */
+    @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) {
+        HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
+        HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
+
+        ArrayList<ClusterNode> daemonNodes = new ArrayList<>(topSnapshot.size());
+        ArrayList<ClusterNode> srvNodes = new ArrayList<>(topSnapshot.size());
+        ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
+        ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
+
+        for (ClusterNode node : topSnapshot) {
+            if (alive(node))
+                alives.add(node.id());
+
+            if (node.isDaemon())
+                daemonNodes.add(node);
+            else {
+                allNodes.add(node);
+
+                if (!node.isLocal())
+                    rmtNodes.add(node);
+
+                if (!CU.clientNode(node))
+                    srvNodes.add(node);
+            }
+
+            nodeMap.put(node.id(), node);
+        }
+
+        assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+            " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+        Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
+        Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size());
+
+        Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+        Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+
+        Set<Integer> nearEnabledCaches = new HashSet<>();
+
+        for (ClusterNode node : allNodes) {
+            assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+            assert !node.isDaemon();
+
+            for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+                String cacheName = entry.getKey();
+                CachePredicate filter = entry.getValue();
+
+                if (filter.cacheNode(node)) {
+                    allNodesWithCaches.add(node);
+
+                    if(!CU.clientNode(node))
+                        srvNodesWithCaches.add(node);
+
+                    if (!node.isLocal())
+                        rmtNodesWithCaches.add(node);
+
+                    addToMap(allCacheNodes, cacheName, node);
+
+                    if (filter.dataNode(node))
+                        addToMap(affCacheNodes, cacheName, node);
+
+                    if (filter.nearNode(node))
+                        nearEnabledCaches.add(CU.cacheId(cacheName));
+                }
+            }
+        }
+
+        return new DiscoCache(
+            loc,
+            Collections.unmodifiableList(rmtNodes),
+            Collections.unmodifiableList(allNodes),
+            Collections.unmodifiableList(srvNodes),
+            Collections.unmodifiableList(daemonNodes),
+            U.sealList(srvNodesWithCaches),
+            U.sealList(allNodesWithCaches),
+            U.sealList(rmtNodesWithCaches),
+            Collections.unmodifiableMap(allCacheNodes),
+            Collections.unmodifiableMap(affCacheNodes),
+            Collections.unmodifiableMap(nodeMap),
+            Collections.unmodifiableSet(nearEnabledCaches),
+            alives);
+    }
+
+    /**
+     * Adds node to map.
+     *
+     * @param cacheMap Map to add to.
+     * @param cacheName Cache name.
+     * @param rich Node to add
+     */
+    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+        List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
+
+        if (cacheNodes == null) {
+            cacheNodes = new ArrayList<>();
+
+            cacheMap.put(CU.cacheId(cacheName), cacheNodes);
+        }
+
+        cacheNodes.add(rich);
+    }
+
+    /**
      * Updates topology version if current version is smaller than updated.
      *
      * @param updated Updated topology version.
@@ -2048,8 +2130,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     lastChk = now;
 
                     if (!segValid) {
-                        discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, getSpi().getLocalNode(),
-                            Collections.<ClusterNode>emptyList(), null);
+                        List<ClusterNode> empty = Collections.emptyList();
+
+                        ClusterNode node = getSpi().getLocalNode();
+
+                        discoWrk.addEvent(EVT_NODE_SEGMENTED,
+                            AffinityTopologyVersion.NONE,
+                            node,
+                            createDiscoCache(node, empty),
+                            empty,
+                            null);
 
                         lastSegChkRes.set(false);
                     }
@@ -2069,8 +2159,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
         /** Event queue. */
-        private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
-            DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
+        private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode,
+            DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>();
 
         /** Node segmented event fired flag. */
         private boolean nodeSegFired;
@@ -2088,10 +2178,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
          * @param topVer Topology version.
          * @param node Remote node this event is connected with.
+         * @param discoCache Discovery cache.
          * @param topSnapshot Topology snapshot.
          */
         @SuppressWarnings("RedundantTypeArguments")
-        private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
+        private void recordEvent(int type, long topVer, ClusterNode node, DiscoCache discoCache, Collection<ClusterNode> topSnapshot) {
             assert node != null;
 
             if (ctx.event().isRecordable(type)) {
@@ -2100,7 +2191,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 evt.node(ctx.discovery().localNode());
                 evt.eventNode(node);
                 evt.type(type);
-
                 evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, FILTER_DAEMON));
 
                 if (type == EVT_NODE_METRICS_UPDATED)
@@ -2127,7 +2217,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 else
                     assert false;
 
-                ctx.event().record(evt);
+                ctx.event().record(evt, discoCache);
             }
         }
 
@@ -2135,6 +2225,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @param type Event type.
          * @param topVer Topology version.
          * @param node Node.
+         * @param discoCache Discovery cache.
          * @param topSnapshot Topology snapshot.
          * @param data Custom message.
          */
@@ -2142,12 +2233,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             int type,
             AffinityTopologyVersion topVer,
             ClusterNode node,
+            DiscoCache discoCache,
             Collection<ClusterNode> topSnapshot,
             @Nullable DiscoveryCustomMessage data
         ) {
             assert node != null : data;
 
-            evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data));
+            evts.add(new GridTuple6<>(type, topVer, node, discoCache, topSnapshot, data));
         }
 
         /**
@@ -2184,7 +2276,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         /** @throws InterruptedException If interrupted. */
         @SuppressWarnings("DuplicateCondition")
         private void body0() throws InterruptedException {
-            GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>,
+            GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, DiscoCache, Collection<ClusterNode>,
                 DiscoveryCustomMessage> evt = evts.take();
 
             int type = evt.get1();
@@ -2317,11 +2409,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         customEvt.node(ctx.discovery().localNode());
                         customEvt.eventNode(node);
                         customEvt.type(type);
-                        customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4());
+                        customEvt.topologySnapshot(topVer.topologyVersion(), evt.get5());
                         customEvt.affinityTopologyVersion(topVer);
-                        customEvt.customMessage(evt.get5());
+                        customEvt.customMessage(evt.get6());
 
-                        ctx.event().record(customEvt);
+                        ctx.event().record(customEvt, evt.get4());
                     }
 
                     return;
@@ -2335,7 +2427,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     assert false : "Invalid discovery event: " + type;
             }
 
-            recordEvent(type, topVer.topologyVersion(), node, evt.get4());
+            recordEvent(type, topVer.topologyVersion(), node, evt.get4(), evt.get5());
 
             if (segmented)
                 onSegmentation();
@@ -2545,432 +2637,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
     }
 
-    /** Cache for discovery collections. */
-    private class DiscoCache {
-        /** Remote nodes. */
-        private final List<ClusterNode> rmtNodes;
-
-        /** All nodes. */
-        private final List<ClusterNode> allNodes;
-
-        /** All server nodes. */
-        private final List<ClusterNode> srvNodes;
-
-        /** All nodes with at least one cache configured. */
-        @GridToStringInclude
-        private final Collection<ClusterNode> allNodesWithCaches;
-
-        /** All nodes with at least one cache configured. */
-        @GridToStringInclude
-        private final Collection<ClusterNode> rmtNodesWithCaches;
-
-        /** Cache nodes by cache name. */
-        @GridToStringInclude
-        private final Map<String, Collection<ClusterNode>> allCacheNodes;
-
-        /** Remote cache nodes by cache name. */
-        @GridToStringInclude
-        private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
-
-        /** Cache nodes by cache name. */
-        @GridToStringInclude
-        private final Map<String, Collection<ClusterNode>> affCacheNodes;
-
-        /** Caches where at least one node has near cache enabled. */
-        @GridToStringInclude
-        private final Set<String> nearEnabledCaches;
-
-        /** Nodes grouped by version. */
-        private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
-
-        /** Daemon nodes. */
-        private final List<ClusterNode> daemonNodes;
-
-        /** Node map. */
-        private final Map<UUID, ClusterNode> nodeMap;
-
-        /** Local node. */
-        private final ClusterNode loc;
-
-        /** Highest node order. */
-        private final long maxOrder;
-
-        /**
-         * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
-         * #maskNull(String)} before passing raw cache names to it.
-         */
-        private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
-
-        /**
-         * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
-         * #maskNull(String)} before passing raw cache names to it.
-         */
-        private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
-
-        /**
-         * Cached alive server remote nodes with caches.
-         */
-        private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
-
-        /**
-         * @param loc Local node.
-         * @param rmts Remote nodes.
-         */
-        private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
-            this.loc = loc;
-
-            rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, FILTER_DAEMON)));
-
-            assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
-                " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
-
-            List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1);
-
-            if (!loc.isDaemon())
-                all.add(loc);
-
-            all.addAll(rmtNodes);
-
-            Collections.sort(all, GridNodeOrderComparator.INSTANCE);
-
-            allNodes = Collections.unmodifiableList(all);
-
-            Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
-            Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
-            Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
-            Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
-            Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
-
-            aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
-            aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
-            aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
-            nodesByVer = new TreeMap<>();
-
-            long maxOrder0 = 0;
-
-            Set<String> nearEnabledSet = new HashSet<>();
-
-            List<ClusterNode> srvNodes = new ArrayList<>();
-
-            for (ClusterNode node : allNodes) {
-                assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
-                assert !node.isDaemon();
-
-                if (!CU.clientNode(node))
-                    srvNodes.add(node);
-
-                if (node.order() > maxOrder0)
-                    maxOrder0 = node.order();
-
-                boolean hasCaches = false;
-
-                for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
-                    String cacheName = entry.getKey();
-
-                    CachePredicate filter = entry.getValue();
-
-                    if (filter.cacheNode(node)) {
-                        nodesWithCaches.add(node);
-
-                        if (!loc.id().equals(node.id()))
-                            rmtNodesWithCaches.add(node);
-
-                        addToMap(cacheMap, cacheName, node);
-
-                        if (alive(node.id()))
-                            addToMap(aliveCacheNodes, maskNull(cacheName), node);
-
-                        if (filter.dataNode(node))
-                            addToMap(dhtNodesMap, cacheName, node);
-
-                        if (filter.nearNode(node))
-                            nearEnabledSet.add(cacheName);
-
-                        if (!loc.id().equals(node.id())) {
-                            addToMap(rmtCacheMap, cacheName, node);
-
-                            if (alive(node.id()))
-                                addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
-                        }
-
-                        hasCaches = true;
-                    }
-                }
-
-                if (hasCaches && alive(node.id()) && !CU.clientNode(node))
-                    aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
-
-                IgniteProductVersion nodeVer = U.productVersion(node);
-
-                // Create collection for this version if it does not exist.
-                Collection<ClusterNode> nodes = nodesByVer.get(nodeVer);
-
-                if (nodes == null) {
-                    nodes = new ArrayList<>(allNodes.size());
-
-                    nodesByVer.put(nodeVer, nodes);
-                }
-
-                nodes.add(node);
-            }
-
-            Collections.sort(srvNodes, CU.nodeComparator(true));
-
-            // Need second iteration to add this node to all previous node versions.
-            for (ClusterNode node : allNodes) {
-                IgniteProductVersion nodeVer = U.productVersion(node);
-
-                // Get all versions lower or equal node's version.
-                NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView =
-                    nodesByVer.headMap(nodeVer, false);
-
-                for (Collection<ClusterNode> prevVersions : updateView.values())
-                    prevVersions.add(node);
-            }
-
-            maxOrder = maxOrder0;
-
-            allCacheNodes = Collections.unmodifiableMap(cacheMap);
-            rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
-            affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
-            allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
-            this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
-            nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
-            this.srvNodes = Collections.unmodifiableList(srvNodes);
-
-            daemonNodes = Collections.unmodifiableList(new ArrayList<>(
-                F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON))));
-
-            Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
-
-            for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
-                nodeMap.put(n.id(), n);
-
-            this.nodeMap = nodeMap;
-        }
-
-        /**
-         * Adds node to map.
-         *
-         * @param cacheMap Map to add to.
-         * @param cacheName Cache name.
-         * @param rich Node to add
-         */
-        private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
-            Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
-
-            if (cacheNodes == null) {
-                cacheNodes = new ArrayList<>(allNodes.size());
-
-                cacheMap.put(cacheName, cacheNodes);
-            }
-
-            cacheNodes.add(rich);
-        }
-
-        /** @return Local node. */
-        ClusterNode localNode() {
-            return loc;
-        }
-
-        /** @return Remote nodes. */
-        Collection<ClusterNode> remoteNodes() {
-            return rmtNodes;
-        }
-
-        /** @return All nodes. */
-        Collection<ClusterNode> allNodes() {
-            return allNodes;
-        }
-
-        /**
-         * Gets collection of nodes which have version equal or greater than {@code ver}.
-         *
-         * @param ver Version to check.
-         * @return Collection of nodes with version equal or greater than {@code ver}.
-         */
-        Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
-            Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
-
-            if (entry == null)
-                return Collections.emptyList();
-
-            return entry.getValue();
-        }
-
-        /**
-         * @return Versions map.
-         */
-        NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
-            return nodesByVer;
-        }
-
-        /**
-         * Gets collection of nodes with at least one cache configured.
-         *
-         * @param topVer Topology version (maximum allowed node order).
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> allNodesWithCaches(final long topVer) {
-            return filter(topVer, allNodesWithCaches);
-        }
-
-        /**
-         * Gets all nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, allCacheNodes.get(cacheName));
-        }
-
-        /**
-         * Gets all remote nodes that have at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> remoteCacheNodes(final long topVer) {
-            return filter(topVer, rmtNodesWithCaches);
-        }
-
-        /**
-         * Gets all nodes that have cache with given name and should participate in affinity calculation. With
-         * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, affCacheNodes.get(cacheName));
-        }
-
-        /**
-         * Gets all alive nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
-        }
-
-        /**
-         * Gets all alive remote nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
-        }
-
-        /**
-         * Checks if cache with given name has at least one node with near cache enabled.
-         *
-         * @param cacheName Cache name.
-         * @return {@code True} if cache with given name has at least one node with near cache enabled.
-         */
-        boolean hasNearCache(@Nullable String cacheName) {
-            return nearEnabledCaches.contains(cacheName);
-        }
-
-        /**
-         * Removes left node from cached alives lists.
-         *
-         * @param leftNode Left node.
-         */
-        void updateAlives(ClusterNode leftNode) {
-            if (leftNode.order() > maxOrder)
-                return;
-
-            filterNodeMap(aliveCacheNodes, leftNode);
-
-            filterNodeMap(aliveRmtCacheNodes, leftNode);
-
-            aliveSrvNodesWithCaches.remove(leftNode);
-        }
-
-        /**
-         * Creates a copy of nodes map without the given node.
-         *
-         * @param map Map to copy.
-         * @param exclNode Node to exclude.
-         */
-        private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
-            for (String cacheName : registeredCaches.keySet()) {
-                String maskedName = maskNull(cacheName);
-
-                while (true) {
-                    Collection<ClusterNode> oldNodes = map.get(maskedName);
-
-                    if (oldNodes == null || oldNodes.isEmpty())
-                        break;
-
-                    Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
-
-                    if (!newNodes.remove(exclNode))
-                        break;
-
-                    if (map.replace(maskedName, oldNodes, newNodes))
-                        break;
-                }
-            }
-        }
-
-        /**
-         * Replaces {@code null} with {@code NULL_CACHE_NAME}.
-         *
-         * @param cacheName Cache name.
-         * @return Masked name.
-         */
-        private String maskNull(@Nullable String cacheName) {
-            return cacheName == null ? NULL_CACHE_NAME : cacheName;
-        }
-
-        /**
-         * @param topVer Topology version.
-         * @param nodes Nodes.
-         * @return Filtered collection (potentially empty, but never {@code null}).
-         */
-        private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) {
-            if (nodes == null)
-                return Collections.emptyList();
-
-            // If no filtering needed, return original collection.
-            return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ?
-                nodes :
-                F.view(nodes, new P1<ClusterNode>() {
-                    @Override public boolean apply(ClusterNode node) {
-                        return node.order() <= topVer;
-                    }
-                });
-        }
-
-        /** @return Daemon nodes. */
-        Collection<ClusterNode> daemonNodes() {
-            return daemonNodes;
-        }
-
-        /**
-         * @param id Node ID.
-         * @return Node.
-         */
-        @Nullable ClusterNode node(UUID id) {
-            return nodeMap.get(id);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
-        }
-    }
-
     /**
      * Cache predicate.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
new file mode 100644
index 0000000..963d97e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/DiscoveryEventListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.eventstorage;
+
+import java.util.EventListener;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+
+/**
+ * Internal listener for discovery events.
+ */
+public interface DiscoveryEventListener extends EventListener {
+    /**
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     */
+    public void onEvent(DiscoveryEvent evt, DiscoCache discoCache);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 684e326..f8abd30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -79,6 +80,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     /** Local event listeners. */
     private final ConcurrentMap<Integer, Set<GridLocalEventListener>> lsnrs = new ConcurrentHashMap8<>();
 
+    /** Internal discovery listeners. */
+    private final ConcurrentMap<Integer, Set<DiscoveryEventListener>> discoLsnrs = new ConcurrentHashMap8<>();
+
     /** Busy lock to control activity of threads. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
@@ -234,6 +238,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         msgLsnr = null;
 
         lsnrs.clear();
+        discoLsnrs.clear();
     }
 
     /** {@inheritDoc} */
@@ -300,6 +305,30 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
+     * Records discovery events.
+     *
+     * @param evt Event to record.
+     * @param discoCache Discovery cache.
+     */
+    public void record(DiscoveryEvent evt, DiscoCache discoCache) {
+        assert evt != null;
+
+        if (!enterBusy())
+            return;
+
+        try {
+            // Notify internal discovery listeners first.
+            notifyDiscoveryListeners(evt, discoCache);
+
+            // Notify all other registered listeners.
+            record(evt);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
      * Gets types of enabled user-recordable events.
      *
      * @return Array of types of enabled user-recordable events.
@@ -570,7 +599,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
 
         try {
             for (int t : types) {
-                getOrCreate(t).add(lsnr);
+                getOrCreate(lsnrs, t).add(lsnr);
 
                 if (!isRecordable(t))
                     U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
@@ -595,14 +624,14 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
             return;
 
         try {
-            getOrCreate(type).add(lsnr);
+            getOrCreate(lsnrs, type).add(lsnr);
 
             if (!isRecordable(type))
                 U.warn(log, "Added listener for disabled event type: " + U.gridEventName(type));
 
             if (types != null) {
                 for (int t : types) {
-                    getOrCreate(t).add(lsnr);
+                    getOrCreate(lsnrs, t).add(lsnr);
 
                     if (!isRecordable(t))
                         U.warn(log, "Added listener for disabled event type: " + U.gridEventName(t));
@@ -615,16 +644,70 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
+     * Adds discovery event listener. Note that this method specifically disallow an empty
+     * array of event type to prevent accidental subscription for all system event that
+     * may lead to a drastic performance decrease.
+     *
+     * @param lsnr Listener to add.
+     * @param types Event types to subscribe listener for.
+     */
+    public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int[] types) {
+        assert lsnr != null;
+        assert types != null;
+        assert types.length > 0;
+
+        if (!enterBusy())
+            return;
+
+        try {
+            for (int t : types) {
+                getOrCreate(discoLsnrs, t).add(lsnr);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Adds discovery event listener.
+     *
+     * @param lsnr Listener to add.
+     * @param type Event type to subscribe listener for.
+     * @param types Additional event types to subscribe listener for.
+     */
+    public void addDiscoveryEventListener(DiscoveryEventListener lsnr, int type, @Nullable int... types) {
+        assert lsnr != null;
+
+        if (!enterBusy())
+            return;
+
+        try {
+            getOrCreate(discoLsnrs, type).add(lsnr);
+
+            if (types != null) {
+                for (int t : types) {
+                    getOrCreate(discoLsnrs, t).add(lsnr);
+                }
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param lsnrs Listeners map.
      * @param type Event type.
      * @return Listeners for given event type.
      */
-    private Collection<GridLocalEventListener> getOrCreate(Integer type) {
-        Set<GridLocalEventListener> set = lsnrs.get(type);
+    private <T> Collection<T> getOrCreate(ConcurrentMap<Integer, Set<T>> lsnrs, Integer type) {
+        Set<T> set = lsnrs.get(type);
 
         if (set == null) {
             set = new GridConcurrentLinkedHashSet<>();
 
-            Set<GridLocalEventListener> prev = lsnrs.putIfAbsent(type, set);
+            Set<T> prev = lsnrs.putIfAbsent(type, set);
 
             if (prev != null)
                 set = prev;
@@ -688,6 +771,38 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
+     * Removes listener for specified events, if any. If no event types provided - it
+     * remove the listener for all its registered events.
+     *
+     * @param lsnr Listener.
+     * @param types Event types.
+     * @return Returns {@code true} if removed.
+     */
+    public boolean removeDiscoveryEventListener(DiscoveryEventListener lsnr, @Nullable int... types) {
+        assert lsnr != null;
+
+        boolean found = false;
+
+        if (F.isEmpty(types)) {
+            for (Set<DiscoveryEventListener> set : discoLsnrs.values())
+                if (set.remove(lsnr))
+                    found = true;
+        }
+        else {
+            assert types != null;
+
+            for (int type : types) {
+                Set<DiscoveryEventListener> set = discoLsnrs.get(type);
+
+                if (set != null && set.remove(lsnr))
+                    found = true;
+            }
+        }
+
+        return found;
+    }
+
+    /**
      *
      * @param p Optional predicate.
      * @param types Event types to wait for.
@@ -780,6 +895,41 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
     }
 
     /**
+     * @param evt Discovery event
+     * @param cache Discovery cache.
+     */
+    private void notifyDiscoveryListeners(DiscoveryEvent evt, DiscoCache cache) {
+        assert evt != null;
+
+        notifyDiscoveryListeners(discoLsnrs.get(evt.type()), evt, cache);
+    }
+
+    /**
+     * @param set Set of listeners.
+     * @param evt Discovery event.
+     * @param cache Discovery cache.
+     */
+    private void notifyDiscoveryListeners(@Nullable Collection<DiscoveryEventListener> set, DiscoveryEvent evt, DiscoCache cache) {
+        assert evt != null;
+
+        if (!F.isEmpty(set)) {
+            assert set != null;
+
+            for (DiscoveryEventListener lsnr : set) {
+                try {
+                    lsnr.onEvent(evt, cache);
+                }
+                catch (Throwable e) {
+                    U.error(log, "Unexpected exception in listener notification for event: " + evt, e);
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+        }
+    }
+
+    /**
      * @param p Grid event predicate.
      * @return Collection of grid events.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..5070462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -252,10 +253,12 @@ public class GridAffinityAssignmentCache {
      *
      * @param topVer Topology version to calculate affinity cache for.
      * @param discoEvt Discovery event that caused this topology version change.
+     * @param discoCache Discovery cache.
      * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt,
+        DiscoCache discoCache) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
@@ -266,7 +269,7 @@ public class GridAffinityAssignmentCache {
         List<ClusterNode> sorted;
 
         if (!locCache) {
-            sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
+            sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId()));
 
             Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 459723c..c1dde13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -387,7 +387,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 cctx.cache().prepareCacheStart(req, fut.topologyVersion());
 
                 if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
-                    if (cctx.discovery().cacheAffinityNodes(req.cacheName(), fut.topologyVersion()).isEmpty())
+                    if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
                         U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                 }
 
@@ -408,7 +408,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
                             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent());
+                                fut.discoveryEvent(), fut.discoCache());
 
                             aff.initialize(fut.topologyVersion(), assignment);
                         }
@@ -768,7 +768,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             assert old == null : old;
 
-            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+            List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             cache.affinity().initialize(fut.topologyVersion(), newAff);
         }
@@ -806,7 +806,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
-                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
                         cache.affinity().initialize(fut.topologyVersion(), assignment);
                     }
@@ -832,14 +832,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
         throws IgniteCheckedException {
         if (!fetch && canCalculateAffinity(aff, fut)) {
-            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             aff.initialize(fut.topologyVersion(), assignment);
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                 aff.cacheName(),
-                fut.topologyVersion());
+                fut.topologyVersion(),
+                fut.discoCache());
 
             fetchFut.init();
 
@@ -893,7 +894,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                             CacheHolder cache = cache(fut, cacheDesc);
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent());
+                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache());
 
                             cache.affinity().initialize(topVer, newAff);
                         }
@@ -960,14 +961,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
                 List<List<ClusterNode>> assignment =
-                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                    cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
                 cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
             }
             else {
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                     cacheCtx.name(),
-                    topVer);
+                    topVer,
+                    fut.discoCache());
 
                 fetchFut.init();
 
@@ -1001,7 +1003,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
 
             affCache.initialize(topVer, aff);
         }
@@ -1013,7 +1015,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent());
+                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
             }
 
             List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1043,7 +1045,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (cacheCtx.isLocal())
                     continue;
 
-                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
             }
 
             centralizedAff = true;
@@ -1093,7 +1095,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 if (cache != null) {
                     if (cache.client())
-                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+                        cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
                     return;
                 }
@@ -1133,7 +1135,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                         aff.cacheName(),
-                        prev.topologyVersion());
+                        prev.topologyVersion(),
+                        prev.discoCache());
 
                     fetchFut.init();
 
@@ -1144,7 +1147,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             throws IgniteCheckedException {
                             fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
-                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+                            aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
                             affFut.onDone(fut.topologyVersion());
                         }
@@ -1283,7 +1286,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index a65d971..24321b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteFuture;
@@ -77,7 +76,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(LOC_CACHE_TOP_VER, null);
+            aff.calculate(LOC_CACHE_TOP_VER, null, null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/42346673/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 695872f..ff7feb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,7 +48,6 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -57,6 +56,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
@@ -181,35 +182,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
 
     /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
-        @Override public void onEvent(Event evt) {
+    private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
             if (!enterBusy())
                 return;
 
             try {
-                DiscoveryEvent e = (DiscoveryEvent)evt;
-
                 ClusterNode loc = cctx.localNode();
 
-                assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED ||
-                    e.type() == EVT_DISCOVERY_CUSTOM_EVT;
+                assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ||
+                    evt.type() == EVT_DISCOVERY_CUSTOM_EVT;
 
-                final ClusterNode n = e.eventNode();
+                final ClusterNode n = evt.eventNode();
 
                 GridDhtPartitionExchangeId exchId = null;
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
-                if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+                if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
                     assert !loc.id().equals(n.id());
 
-                    if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
+                    if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
                         assert cctx.discovery().node(n.id()) == null;
 
                         // Avoid race b/w initial future add and discovery event.
                         GridDhtPartitionsExchangeFuture initFut = null;
 
                         if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
-                            initFut = exchangeFuture(initialExchangeId(), null, null, null);
+                            initFut = exchangeFuture(initialExchangeId(), null, null, null, null);
 
                             initFut.onNodeLeft(n);
                         }
@@ -220,16 +219,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         }
                     }
 
-                    assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
+                    assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
                         "Node joined with smaller-than-local " +
                             "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 
-                    exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                    exchId = exchangeId(n.id(),
+                        affinityTopologyVersion(evt),
+                        evt.type());
 
-                    exchFut = exchangeFuture(exchId, e, null, null);
+                    exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
+                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
 
                     if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
                         DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
@@ -260,9 +261,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         //todo think about refactoring
                         if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
-                            exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                            exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                            exchFut = exchangeFuture(exchId, e, valid, null);
+                            exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                         }
                     }
                     else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
@@ -270,19 +271,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
-                                exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                                exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                                exchFut = exchangeFuture(exchId, e, null, msg);
+                                exchFut = exchangeFuture(exchId, evt, cache, null, msg);
                             }
                         }
                         else
-                            exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
                     }
                     else if (customEvt.customMessage() instanceof StartSnapshotOperationAckDiscoveryMessage
                         && !((StartSnapshotOperationAckDiscoveryMessage)customEvt.customMessage()).hasError()) {
-                        exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+                        exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                        exchFut = exchangeFuture(exchId, e, null, null);
+                        exchFut = exchangeFuture(exchId, evt, null, null, null);
                     }
                 }
 
@@ -291,7 +292,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         log.debug("Discovery event (will start exchange): " + exchId);
 
                     // Event callback - without this callback future will never complete.
-                    exchFut.onEvent(exchId, e);
+                    exchFut.onEvent(exchId, evt, cache);
 
                     // Start exchange process.
                     addFuture(exchFut);
@@ -313,7 +314,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         exchWorker = new ExchangeWorker();
 
-        cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
         cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
@@ -371,11 +372,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         assert startTime > 0;
 
         // Generate dummy discovery event for local node joining.
-        DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
+        T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+
+        DiscoveryEvent discoEvt = localJoin.get1();
+        DiscoCache discoCache = localJoin.get2();
 
         GridDhtPartitionExchangeId exchId = initialExchangeId();
 
-        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null);
+        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null);
 
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
@@ -482,7 +486,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
-        cctx.gridEvents().removeLocalEventListener(discoLsnr);
+        cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
         cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
         cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
@@ -1085,12 +1089,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
+     * @param cache Discovery data cache.
      * @param reqs Cache change requests.
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
     private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
+        @Nullable DiscoCache cache,
         @Nullable Collection<DynamicCacheChangeRequest> reqs,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {
         GridDhtPartitionsExchangeFuture fut;
@@ -1109,7 +1115,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         if (discoEvt != null)
-            fut.onEvent(exchId, discoEvt);
+            fut.onEvent(exchId, discoEvt, cache);
 
         if (stopErr != null)
             fut.onDone(stopErr);
@@ -1265,7 +1271,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     cctx.database().releaseHistoryForPreloading();
             }
             else
-                exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+                exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
         }
         finally {
             leaveBusy();
@@ -1318,8 +1324,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
             else {
                 if (msg.client()) {
-                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(
-                        msg.exchangeId(), null, null, null);
+                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+                        null,
+                        null,
+                        null,
+                        null);
 
                     exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
@@ -1329,7 +1338,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     });
                 }
                 else
-                    exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
+                    exchangeFuture(msg.exchangeId(), null, null, null, null).onReceive(node, msg);
             }
         }
         finally {