You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 08:57:31 UTC

[02/14] ignite git commit: ignite-4296 Optimize GridDhtPartitionsSingleMessage processing - optimized processing of GridDhtPartitionsSingleMessage - minor optimizations for RendezvousAffinityFunction - fixed heartbeats sending in tcp discovery

ignite-4296 Optimize GridDhtPartitionsSingleMessage processing
- optimized processing of GridDhtPartitionsSingleMessage
- minor optimizations for RendezvousAffinityFunction
 - fixed heartbeats sending in tcp discovery


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acf20b32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acf20b32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acf20b32

Branch: refs/heads/ignite-comm-balance-master
Commit: acf20b32d8fb68e42b904b091fb2b943f4558cef
Parents: b4aedfd
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 5 14:01:28 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 5 14:01:28 2016 +0300

----------------------------------------------------------------------
 .../rendezvous/RendezvousAffinityFunction.java  |  80 ++++--
 .../discovery/GridDiscoveryManager.java         | 118 +-------
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../processors/cache/GridCacheUtils.java        |  17 --
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +-
 .../dht/GridClientPartitionTopology.java        | 120 ++++----
 .../distributed/dht/GridDhtLocalPartition.java  |   1 -
 .../dht/GridDhtPartitionTopology.java           |  28 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 284 +++++++++++--------
 .../dht/preloader/GridDhtPartitionFullMap.java  |  18 +-
 .../GridDhtPartitionsExchangeFuture.java        |  56 +++-
 .../cache/transactions/IgniteTxHandler.java     |   4 +-
 .../service/GridServiceProcessor.java           |   4 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  40 ++-
 .../tcp/internal/TcpDiscoveryStatistics.java    |   4 +
 .../AbstractAffinityFunctionSelfTest.java       |   2 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |   2 +-
 .../GridCachePartitionedAffinitySpreadTest.java |   7 +-
 .../distributed/dht/GridCacheDhtTestUtils.java  | 232 ---------------
 .../h2/twostep/GridReduceQueryExecutor.java     |  14 +-
 20 files changed, 437 insertions(+), 603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..75e7c92 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
 
     /**
      * Returns collection of nodes (primary first) for specified partition.
+     *
+     * @param d Message digest.
+     * @param part Partition.
+     * @param nodes Nodes.
+     * @param nodesHash Serialized nodes hashes.
+     * @param backups Number of backups.
+     * @param neighborhoodCache Neighborhood.
+     * @return Assignment.
      */
-    public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+    public List<ClusterNode> assignPartition(MessageDigest d,
+        int part,
+        List<ClusterNode> nodes,
+        Map<ClusterNode, byte[]> nodesHash,
+        int backups,
         @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
         if (nodes.size() <= 1)
             return nodes;
 
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+        if (d == null)
+            d = digest.get();
 
-        MessageDigest d = digest.get();
+        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
 
-        for (ClusterNode node : nodes) {
-            Object nodeHash = resolveNodeHash(node);
+        try {
+            for (int i = 0; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
 
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                byte[] nodeHashBytes = nodesHash.get(node);
 
-                byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+                if (nodeHashBytes == null) {
+                    Object nodeHash = resolveNodeHash(node);
 
-                out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
-                out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+                    byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+                    // Add 4 bytes for partition bytes.
+                    nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+                    System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+                    nodesHash.put(node, nodeHashBytes);
+                }
+
+                U.intToBytes(part, nodeHashBytes, 0);
 
                 d.reset();
 
-                byte[] bytes = d.digest(out.toByteArray());
+                byte[] bytes = d.digest(nodeHashBytes);
 
                 long hash =
-                      (bytes[0] & 0xFFL)
-                    | ((bytes[1] & 0xFFL) << 8)
-                    | ((bytes[2] & 0xFFL) << 16)
-                    | ((bytes[3] & 0xFFL) << 24)
-                    | ((bytes[4] & 0xFFL) << 32)
-                    | ((bytes[5] & 0xFFL) << 40)
-                    | ((bytes[6] & 0xFFL) << 48)
-                    | ((bytes[7] & 0xFFL) << 56);
+                    (bytes[0] & 0xFFL)
+                        | ((bytes[1] & 0xFFL) << 8)
+                        | ((bytes[2] & 0xFFL) << 16)
+                        | ((bytes[3] & 0xFFL) << 24)
+                        | ((bytes[4] & 0xFFL) << 32)
+                        | ((bytes[5] & 0xFFL) << 40)
+                        | ((bytes[6] & 0xFFL) << 48)
+                        | ((bytes[7] & 0xFFL) << 56);
 
                 lst.add(F.t(hash, node));
             }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
 
         Collections.sort(lst, COMPARATOR);
@@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
+        MessageDigest d = digest.get();
+
+        List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+        Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+            List<ClusterNode> partAssignment = assignPartition(d,
+                i,
+                nodes,
+                nodesHash,
+                affCtx.backups(),
                 neighborhoodCache);
 
             assignments.add(partAssignment);

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d24f900..ddd4ee3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Gets cache remote nodes for cache with given name.
      *
-     * @param cacheName Cache name.
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
-    }
-
-    /**
-     * Gets cache remote nodes for cache with given name.
-     *
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
@@ -1648,7 +1637,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
-    public Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+    Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
         return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
     }
 
@@ -1659,38 +1648,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
-    public Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+    Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
         return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
     }
 
     /**
-     * Gets alive remote server nodes with at least one cache configured.
-     *
      * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
+     * @return Oldest alive server nodes with at least one cache configured.
      */
-    public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
-    }
+    @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
+        DiscoCache cache = resolveDiscoCache(null, topVer);
 
-    /**
-     * Gets alive server nodes with at least one cache configured.
-     *
-     * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
-     */
-    public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
-    }
+        Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
 
-    /**
-     * Gets alive nodes with at least one cache configured.
-     *
-     * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
-     */
-    public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion());
+        return e != null ? e.getKey() : null;
     }
 
     /**
@@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
 
         /**
-         * Cached alive remote nodes with caches.
-         */
-        private final Collection<ClusterNode> aliveNodesWithCaches;
-
-        /**
          * Cached alive server remote nodes with caches.
          */
-        private final Collection<ClusterNode> aliveSrvNodesWithCaches;
-
-        /**
-         * Cached alive remote server nodes with caches.
-         */
-        private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
+        private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
 
         /**
          * @param loc Local node.
@@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
-            aliveNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
             nodesByVer = new TreeMap<>();
 
             long maxOrder0 = 0;
@@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
-                if (hasCaches) {
-                    if (alive(node.id())) {
-                        aliveNodesWithCaches.add(node);
-
-                        if (!CU.clientNode(node)) {
-                            aliveSrvNodesWithCaches.add(node);
-
-                            if (!loc.id().equals(node.id()))
-                                aliveRmtSrvNodesWithCaches.add(node);
-                        }
-                    }
-                }
+                if (hasCaches && alive(node.id()) && !CU.clientNode(node))
+                    aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
 
                 IgniteProductVersion nodeVer = U.productVersion(node);
 
@@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all remote nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, rmtCacheNodes.get(cacheName));
-        }
-
-        /**
          * Gets all remote nodes that have at least one cache configured.
          *
          * @param topVer Topology version.
@@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all alive remote server nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveRmtSrvNodesWithCaches);
-        }
-
-        /**
-         * Gets all alive server nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveSrvNodesWithCaches);
-        }
-
-        /**
-         * Gets all alive remote nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveNodesWithCaches);
-        }
-
-        /**
          * Checks if cache with given name has at least one node with near cache enabled.
          *
          * @param cacheName Cache name.
@@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             filterNodeMap(aliveRmtCacheNodes, leftNode);
 
-            aliveNodesWithCaches.remove(leftNode);
             aliveSrvNodesWithCaches.remove(leftNode);
-            aliveRmtSrvNodesWithCaches.remove(leftNode);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..7a24aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Partition refresh callback.
      */
     private void refreshPartitions() {
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
             if (log.isDebugEnabled())
@@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null);
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null, true);
 
                         cctx.affinity().checkRebalanceState(top, cacheId);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..d32f4c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -490,23 +490,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets oldest alive server node with at least one cache configured for specified topology version.
-     *
-     * @param ctx Context.
-     * @param topVer Maximum allowed topology version.
-     * @return Oldest alive cache server node.
-     */
-    @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
-        AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
-
-        if (nodes.isEmpty())
-            return null;
-
-        return oldest(nodes);
-    }
-
-    /**
      * @param nodes Nodes.
      * @return Oldest node for the given topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 568a4da..1d60c42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
             assert !metaDataCache.context().affinityNode();
 
             while (true) {
-                ClusterNode oldestSrvNode =
-                    CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+                ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldestSrvNode == null)
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..816132d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             removeNode(exchId.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         assert oldest != null;
 
@@ -536,7 +536,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.gridName() + ']';
+                ", locNodeId=" + cctx.localNodeId() +
+                ", gridName=" + cctx.gridName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -549,7 +550,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -563,7 +564,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +572,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             updateSeq.incrementAndGet();
@@ -634,7 +635,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return null;
+            return false;
         }
         finally {
             lock.writeLock().unlock();
@@ -642,10 +643,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
-        Map<Integer, Long> cntrMap) {
+        Map<Integer, Long> cntrMap,
+        boolean checkEvictions) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -654,29 +655,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
                 lastExchangeId = exchId;
 
             if (node2part == null) {
-                U.dumpStack(log, "Created invalid: " + node2part);
-
                 // Create invalid partition map.
                 node2part = new GridDhtPartitionFullMap();
             }
@@ -688,43 +687,45 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
-            boolean changed = false;
+            node2part.updateSequence(updateSeq);
 
-            if (cur == null || !cur.equals(parts))
-                changed = true;
+            boolean changed = cur == null || !cur.equals(parts);
 
-            node2part.put(parts.nodeId(), parts);
+            if (changed) {
+                node2part.put(parts.nodeId(), parts);
 
-            part2node = new HashMap<>(part2node);
+                // Add new mappings.
+                for (Integer p : parts.keySet()) {
+                    Set<UUID> ids = part2node.get(p);
 
-            // Add new mappings.
-            for (Integer p : parts.keySet()) {
-                Set<UUID> ids = part2node.get(p);
+                    if (ids == null)
+                        // Initialize HashSet to size 3 in anticipation that there won't be
+                        // more than 3 nodes per partition.
+                        part2node.put(p, ids = U.newHashSet(3));
 
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
+                    ids.add(parts.nodeId());
+                }
 
-                changed |= ids.add(parts.nodeId());
-            }
+                // Remove obsolete mappings.
+                if (cur != null) {
+                    for (Integer p : cur.keySet()) {
+                        if (parts.containsKey(p))
+                            continue;
 
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = part2node.get(p);
+                        Set<UUID> ids = part2node.get(p);
 
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
+                        if (ids != null)
+                            ids.remove(parts.nodeId());
+                    }
                 }
             }
+            else
+                cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -740,13 +741,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void checkEvictions() {
+        // No-op.
+    }
+
     /**
      * Updates value for single partition.
      *
@@ -755,13 +761,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
      * @param state State.
      * @param updateSeq Update sequence.
      */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
         assert lock.isWriteLockedByCurrentThread();
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -811,7 +816,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         ClusterNode loc = cctx.localNode();
 
@@ -877,18 +882,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
-        lock.readLock().lock();
-
-        try {
-            return node2part.get(nodeId);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
@@ -919,6 +912,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasMovingPartitions() {
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+                ", locNodeId=" + cctx.localNodeId() +
+                ", gridName=" + cctx.gridName() + ']';
+
+            for (GridDhtPartitionMap2 map : node2part.values()) {
+                if (map.hasMovingPartitions())
+                    return true;
+            }
+
+            return false;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 39a3e08..668a1cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -479,7 +479,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) {
                 shouldBeRenting = false;
-
             if (log.isDebugEnabled())
                 log.debug("Moved partition to RENTING state: " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0f75a5d..14ce1f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -195,6 +195,11 @@ public interface GridDhtPartitionTopology {
     public GridDhtPartitionFullMap partitionMap(boolean onlyActive);
 
     /**
+     * @return {@code True} If one of cache nodes has partitions in {@link GridDhtPartitionState#MOVING} state.
+     */
+    public boolean hasMovingPartitions();
+
+    /**
      * @param e Entry removed from cache.
      */
     public void onRemoved(GridDhtCacheEntry e);
@@ -203,9 +208,9 @@ public interface GridDhtPartitionTopology {
      * @param exchId Exchange ID.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
-     * @return Local partition map if there were evictions or {@code null} otherwise.
+     * @return {@code True} if topology state changed.
      */
-    public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap);
 
@@ -213,11 +218,18 @@ public interface GridDhtPartitionTopology {
      * @param exchId Exchange ID.
      * @param parts Partitions.
      * @param cntrMap Partition update counters.
-     * @return Local partition map if there were evictions or {@code null} otherwise.
+     * @param checkEvictions Check evictions flag.
+     * @return {@code True} if topology state changed.
      */
-    @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
-        @Nullable Map<Integer, Long> cntrMap);
+        @Nullable Map<Integer, Long> cntrMap,
+        boolean checkEvictions);
+
+    /**
+     *
+     */
+    public void checkEvictions();
 
     /**
      * @param skipZeros If {@code true} then filters out zero counters.
@@ -238,12 +250,6 @@ public interface GridDhtPartitionTopology {
     public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
 
     /**
-     * @param nodeId Node to get partitions for.
-     * @return Partitions for node.
-     */
-    @Nullable public GridDhtPartitionMap2 partitions(UUID nodeId);
-
-    /**
      * Prints memory stats.
      *
      * @param threshold Threshold for number of entries.

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ab573bd..1b4dcc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -330,8 +329,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void initPartitions(
-        GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
+    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut)
+        throws IgniteInterruptedCheckedException
+    {
         U.writeLock(lock);
 
         try {
@@ -356,9 +356,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
-        assert oldest != null || cctx.kernalContext().clientNode();
+        ClusterNode oldest = currentCoordinator();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
@@ -397,7 +395,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                        updateSeq = updateLocal(p, locPart.state(), updateSeq);
                     }
                 }
             }
@@ -419,7 +417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing disabled " +
@@ -443,8 +441,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      */
     private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
-        ClusterNode loc = cctx.localNode();
-
         int num = cctx.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
@@ -454,7 +450,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                    updateSeq = updateLocal(p, locPart.state(), updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -485,10 +481,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (exchId.isLeft())
                 removeNode(exchId.nodeId());
 
-            // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
-            assert oldest != null || cctx.kernalContext().clientNode();
+            ClusterNode oldest = currentCoordinator();
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -548,8 +541,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
         boolean changed = waitForRent();
 
-        ClusterNode loc = cctx.localNode();
-
         int num = cctx.affinity().partitions();
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
@@ -600,7 +591,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
                                     locPart + ']';
 
-                                updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -620,7 +611,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -630,7 +621,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateSeq = updateLocal(p, locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -803,8 +794,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
-            return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), topVer,
-                Collections.unmodifiableMap(map), true);
+            return new GridDhtPartitionMap2(cctx.nodeId(),
+                updateSeq.get(),
+                topVer,
+                Collections.unmodifiableMap(map),
+                true);
         }
         finally {
             lock.readLock().unlock();
@@ -985,7 +979,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -997,7 +991,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1025,7 +1019,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1033,7 +1027,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -1076,7 +1070,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             node2part = partMap;
 
-            Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+            Map<Integer, Set<UUID>> p2n = U.newHashMap(cctx.affinity().partitions());
 
             for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
                 for (Integer p : e.getValue().keySet()) {
@@ -1110,7 +1104,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -1118,10 +1112,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
-        @Nullable Map<Integer, Long> cntrMap) {
+        @Nullable Map<Integer, Long> cntrMap,
+        boolean checkEvictions) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -1130,33 +1124,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
-                    Long cntr = this.cntrMap.get(e.getKey());
+                    Integer p = e.getKey();
 
-                    if (cntr == null || cntr < e.getValue())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
-                for (int i = 0; i < locParts.length(); i++) {
-                    GridDhtLocalPartition part = locParts.get(i);
+                    Long cntr = this.cntrMap.get(p);
 
-                    if (part == null)
-                        continue;
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(p, e.getValue());
 
-                    Long cntr = cntrMap.get(part.id());
+                    GridDhtLocalPartition part = locParts.get(p);
 
-                    if (cntr != null)
-                        part.updateCounter(cntr);
+                    if (part != null)
+                        part.updateCounter(e.getValue());
                 }
             }
 
@@ -1165,7 +1154,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
@@ -1182,60 +1171,91 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
-            boolean changed = false;
+            node2part.newUpdateSequence(updateSeq);
 
-            if (cur == null || !cur.equals(parts))
-                changed = true;
+            boolean changed = cur == null || !cur.equals(parts);
 
-            node2part.put(parts.nodeId(), parts);
+            if (changed) {
+                node2part.put(parts.nodeId(), parts);
 
-            part2node = new HashMap<>(part2node);
+                // Add new mappings.
+                for (Integer p : parts.keySet()) {
+                    Set<UUID> ids = part2node.get(p);
 
-            // Add new mappings.
-            for (Integer p : parts.keySet()) {
-                Set<UUID> ids = part2node.get(p);
+                    if (ids == null)
+                        // Initialize HashSet to size 3 in anticipation that there won't be
+                        // more than 3 nodes per partition.
+                        part2node.put(p, ids = U.newHashSet(3));
 
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
+                    ids.add(parts.nodeId());
+                }
 
-                changed |= ids.add(parts.nodeId());
-            }
+                // Remove obsolete mappings.
+                if (cur != null) {
+                    for (Integer p : cur.keySet()) {
+                        if (parts.containsKey(p))
+                            continue;
 
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = part2node.get(p);
+                        Set<UUID> ids = part2node.get(p);
 
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
+                        if (ids != null)
+                            ids.remove(parts.nodeId());
+                    }
                 }
             }
+            else
+                cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
-
-            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
-                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
-
-                changed |= checkEvictions(updateSeq, aff);
-
-                updateRebalanceVersion(aff);
-            }
+            if (checkEvictions)
+                changed |= checkEvictions(updateSeq);
 
             consistencyCheck();
 
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param updateSeq Update sequence.
+     * @return {@code True} if state changed.
+     */
+    private boolean checkEvictions(long updateSeq) {
+        AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+        boolean changed = false;
+
+        if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+            List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+
+            changed = checkEvictions(updateSeq, aff);
+
+            updateRebalanceVersion(aff);
+        }
+
+        return changed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkEvictions() {
+        lock.writeLock().lock();
+
+        try {
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            node2part.newUpdateSequence(updateSeq);
+
+            checkEvictions(updateSeq);
         }
         finally {
             lock.writeLock().unlock();
@@ -1270,7 +1290,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateLocal(part.id(), locId, part.state(), updateSeq);
+                        updateSeq = updateLocal(part.id(), part.state(), updateSeq);
 
                         changed = true;
 
@@ -1295,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateLocal(part.id(), locId, part.state(), updateSeq);
+                                    updateSeq = updateLocal(part.id(), part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1316,19 +1336,27 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     * @return Current coordinator node.
+     */
+    @Nullable private ClusterNode currentCoordinator() {
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+        assert oldest != null || cctx.kernalContext().clientNode();
+
+        return oldest;
+    }
+
+    /**
      * Updates value for single partition.
      *
      * @param p Partition.
-     * @param nodeId Node ID.
      * @param state State.
      * @param updateSeq Update sequence.
+     * @return Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
-        assert nodeId.equals(cctx.nodeId());
-
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+    private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
+        ClusterNode oldest = currentCoordinator();
 
         assert oldest != null || cctx.kernalContext().clientNode();
 
@@ -1338,12 +1366,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             if (seq != updateSeq) {
                 if (seq > updateSeq) {
-                    if (this.updateSeq.get() < seq) {
+                    long seq0 = this.updateSeq.get();
+
+                    if (seq0 < seq) {
                         // Update global counter if necessary.
-                        boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
+                        boolean b = this.updateSeq.compareAndSet(seq0, seq + 1);
 
-                        assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq +
-                            ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']';
+                        assert b : "Invalid update sequence [updateSeq=" + updateSeq +
+                            ", seq=" + seq +
+                            ", curUpdateSeq=" + this.updateSeq.get() +
+                            ", node2part=" + node2part.toFullString() + ']';
 
                         updateSeq = seq + 1;
                     }
@@ -1355,11 +1387,19 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             }
         }
 
-        GridDhtPartitionMap2 map = node2part.get(nodeId);
+        UUID locNodeId = cctx.localNodeId();
+
+        GridDhtPartitionMap2 map = node2part.get(locNodeId);
 
-        if (map == null)
-            node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
-                Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
+        if (map == null) {
+            map = new GridDhtPartitionMap2(locNodeId,
+                updateSeq,
+                topVer,
+                Collections.<Integer, GridDhtPartitionState>emptyMap(),
+                false);
+
+            node2part.put(locNodeId, map);
+        }
 
         map.updateSequence(updateSeq, topVer);
 
@@ -1370,7 +1410,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (ids == null)
             part2node.put(p, ids = U.newHashSet(3));
 
-        ids.add(nodeId);
+        ids.add(locNodeId);
+
+        return updateSeq;
     }
 
     /**
@@ -1395,8 +1437,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             else
                 node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
 
-            part2node = new HashMap<>(part2node);
-
             GridDhtPartitionMap2 parts = node2part.remove(nodeId);
 
             if (parts != null) {
@@ -1418,13 +1458,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean own(GridDhtLocalPartition part) {
-        ClusterNode loc = cctx.localNode();
-
         lock.writeLock().lock();
 
         try {
             if (part.own()) {
-                updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+                updateLocal(part.id(), part.state(), updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1452,7 +1490,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+            updateLocal(part.id(), part.state(), seq);
 
             consistencyCheck();
         }
@@ -1462,18 +1500,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
-        lock.readLock().lock();
-
-        try {
-            return node2part.get(nodeId);
-        }
-        finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
@@ -1526,6 +1552,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasMovingPartitions() {
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+                ", cache=" + cctx.name() +
+                ", started=" + cctx.started() +
+                ", stopping=" + stopping +
+                ", locNodeId=" + cctx.localNode().id() +
+                ", locName=" + cctx.gridName() + ']';
+
+            for (GridDhtPartitionMap2 map : node2part.values()) {
+                if (map.hasMovingPartitions())
+                    return true;
+            }
+
+            return false;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
@@ -1607,10 +1657,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (state == match)
                 return true;
 
-            if (matches != null && matches.length > 0)
-                for (GridDhtPartitionState s : matches)
+            if (matches != null && matches.length > 0) {
+                for (GridDhtPartitionState s : matches) {
                     if (state == s)
                         return true;
+                }
+            }
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 8f5ad17..e8860f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -103,10 +103,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
             GridDhtPartitionMap2 part = e.getValue();
 
-            if (onlyActive)
-                put(e.getKey(), new GridDhtPartitionMap2(part.nodeId(), part.updateSequence(), part.topologyVersion(), part.map(), true));
-            else
-                put(e.getKey(), part);
+            GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(),
+                part.updateSequence(),
+                part.topologyVersion(),
+                part.map(),
+                onlyActive);
+
+            put(e.getKey(), cpy);
         }
     }
 
@@ -177,6 +180,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
 
     /**
      * @param updateSeq New update sequence value.
+     */
+    public void newUpdateSequence(long updateSeq) {
+        this.updateSeq = updateSeq;
+    }
+
+    /**
+     * @param updateSeq New update sequence value.
      * @return Old update sequence value.
      */
     public long updateSequence(long updateSeq) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..e945de9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /** */
     @GridToStringExclude
+    private int pendingSingleUpdates;
+
+    /** */
+    @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
     /** */
@@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
+        boolean updateSingleMap = false;
 
         synchronized (mux) {
             assert crd != null;
 
             if (crd.isLocal()) {
                 if (remaining.remove(node.id())) {
-                    updatePartitionSingleMap(msg);
+                    updateSingleMap = true;
+
+                    pendingSingleUpdates++;
 
                     allReceived = remaining.isEmpty();
                 }
@@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 singleMsgs.put(node, msg);
         }
 
-        if (allReceived)
+        if (updateSingleMap) {
+            try {
+                updatePartitionSingleMap(msg);
+            }
+            finally {
+                synchronized (mux) {
+                    assert pendingSingleUpdates > 0;
+
+                    pendingSingleUpdates--;
+
+                    if (pendingSingleUpdates == 0)
+                        mux.notifyAll();
+                }
+            }
+        }
+
+        if (allReceived) {
+            awaitSingleMapUpdates();
+
             onAllReceived(false);
+        }
+    }
+
+    /**
+     *
+     */
+    private void awaitSingleMapUpdates() {
+        synchronized (mux) {
+            try {
+                while (pendingSingleUpdates > 0)
+                    U.wait(mux);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+            }
+        }
     }
 
     /**
@@ -1218,6 +1259,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
 
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (!cacheCtx.isLocal())
+                    cacheCtx.topology().checkEvictions();
+            }
+
             updateLastVersion(cctx.versions().last());
 
             cctx.versions().onExchange(lastVer.get().order());
@@ -1374,7 +1420,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
             else {
-                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
                     cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
@@ -1395,7 +1441,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId), false);
         }
     }
 
@@ -1557,6 +1603,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                         if (crd0.isLocal()) {
                             if (allReceived) {
+                                awaitSingleMapUpdates();
+
                                 onAllReceived(true);
 
                                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fbd8ce5..cf69264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -679,14 +679,14 @@ public class IgniteTxHandler {
      * @param req Request.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId,
+    @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId,
         GridNearTxFinishRequest req) {
         if (txFinishMsgLog.isDebugEnabled())
             txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId + ']');
 
         IgniteInternalFuture<IgniteInternalTx> fut = finish(nodeId, null, req);
 
-        assert req.txState() != null || fut.error() != null ||
+        assert req.txState() != null || (fut != null && fut.error() != null) ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
 
         return fut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6c26363..b9b92b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         try {
             if (!cache.context().affinityNode()) {
                 ClusterNode oldestSrvNode =
-                    CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+                    ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldestSrvNode == null)
                     return new GridEmptyIterator<>();
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
                 depExe.submit(new BusyRunnable() {
                     @Override public void run0() {
-                        ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 
                         if (oldest != null && oldest.isLocal()) {
                             final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..a660ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -174,8 +175,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         IgniteProductVersion.fromString("1.5.0");
 
     /** */
-    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+    private IgniteThreadPoolExecutor utilityPool;
 
     /** Nodes ring. */
     @GridToStringExclude
@@ -297,6 +297,13 @@ class ServerImpl extends TcpDiscoveryImpl {
             spiState = DISCONNECTED;
         }
 
+        utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+            spi.ignite().name(),
+            0,
+            1,
+            2000,
+            new LinkedBlockingQueue<Runnable>());
+
         if (debugMode) {
             if (!log.isInfoEnabled())
                 throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -2403,9 +2410,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** */
+        private long lastRingMsgTime;
+
         /**
          */
-        protected RingMessageWorker() {
+        RingMessageWorker() {
             super("tcp-disco-msg-worker", 10);
 
             initConnectionCheckFrequency();
@@ -2500,6 +2510,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            sendHeartbeatMessage();
+
             DebugLogger log = messageLogger(msg);
 
             if (log.isDebugEnabled())
@@ -2508,6 +2520,11 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (debugMode)
                 debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
+            boolean ensured = spi.ensured(msg);
+
+            if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+                lastRingMsgTime = U.currentTimeMillis();
+
             if (locNode.internalOrder() == 0) {
                 boolean proc = false;
 
@@ -2564,7 +2581,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg) && redirectToClients(msg))
+            if (ensured && redirectToClients(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -5336,12 +5353,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Sends heartbeat message if needed.
          */
         private void sendHeartbeatMessage() {
-            if (!isLocalNodeCoordinator())
-                return;
-
             long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
 
-            if (elapsed > 0)
+            if (elapsed > 0 || !isLocalNodeCoordinator())
                 return;
 
             TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5375,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
                 lastTimeStatusMsgSent = locNode.lastUpdateTime();
 
-            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+            long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+            long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
 
             if (elapsed > 0)
                 return;
@@ -6062,11 +6078,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState state = spiStateCopy();
 
-            long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
 
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res, socketTimeout);
+                spi.writeToSocket(msg, sock, res, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index 9e73632..c790644 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
@@ -317,7 +319,9 @@ public class TcpDiscoveryStatistics {
         assert time >= 0 : time;
 
         if (crdSinceTs.get() > 0 &&
+            (msg instanceof TcpDiscoveryCustomEventMessage) ||
             (msg instanceof TcpDiscoveryNodeAddedMessage) ||
+            (msg instanceof TcpDiscoveryNodeAddFinishedMessage) ||
             (msg instanceof TcpDiscoveryNodeLeftMessage) ||
             (msg instanceof TcpDiscoveryNodeFailedMessage)) {
             ringMsgsSndTs.put(msg.id(), U.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index 878d7d1..43017db 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
     }
 
     /**
+     * @param backups Number of backups.
      * @throws Exception If failed.
      */
     protected void checkNodeRemoved(int backups) throws Exception {
@@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
         }
     }
 
-
     /**
      * @param assignment Assignment to verify.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 390c83e..31b4bc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
                 GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
 
                 ClusterNode oldest =
-                    GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer));
+                    ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
 
                 assertNotNull(oldest);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe
         Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
 
         for (int part = 0; part < aff.getPartitions(); part++) {
-            Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+            Collection<ClusterNode> affNodes = aff.assignPartition(null,
+                part,
+                new ArrayList<>(nodes),
+                new HashMap<ClusterNode, byte[]>(),
+                0,
+                null);
 
             assertEquals(1, affNodes.size());