You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/04/13 11:55:50 UTC

ignite git commit: ignite-3018 Cache affinity calculation is slow with large nodes number

Repository: ignite
Updated Branches:
  refs/heads/master 76485fc3e -> 027b2c27c


ignite-3018 Cache affinity calculation is slow with large nodes number


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/027b2c27
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/027b2c27
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/027b2c27

Branch: refs/heads/master
Commit: 027b2c27c8824ef1498883dff3af9d5be37a80b5
Parents: 76485fc
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Apr 13 14:55:39 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 14:55:39 2017 +0300

----------------------------------------------------------------------
 .../rendezvous/RendezvousAffinityFunction.java  |  283 +++--
 .../GridCachePartitionExchangeManager.java      |   20 +-
 ...inityFunctionFastPowerOfTwoHashSelfTest.java |   17 -
 ...ndezvousAffinityFunctionSimpleBenchmark.java | 1100 ++++++++++++++++++
 ...ousAffinityFunctionStandardHashSelfTest.java |   17 -
 .../IgniteClientReconnectCacheTest.java         |   16 +-
 .../internal/binary/BinaryEnumsSelfTest.java    |    2 +
 .../GridCachePartitionedAffinitySpreadTest.java |  169 ---
 ...dCachePartitionedQueueEntryMoveSelfTest.java |    2 +-
 ...ridCachePartitionNotLoadedEventSelfTest.java |    2 +
 .../near/GridCacheNearTxForceKeyTest.java       |    6 +-
 ...cheRebalancingPartitionDistributionTest.java |    2 +-
 ...gniteServiceConfigVariationsFullApiTest.java |    9 +-
 .../IgniteServiceDynamicCachesSelfTest.java     |   12 +-
 .../ignite/testframework/GridTestNode.java      |   12 +-
 ...PartitionOnAffinityRunAtomicCacheOpTest.java |   46 +-
 16 files changed, 1364 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index dcac7d4..9c84f00 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -22,18 +22,16 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -84,20 +81,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /** Comparator. */
     private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
 
-    /** Thread local message digest. */
-    private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
-        @Override protected MessageDigest initialValue() {
-            try {
-                return MessageDigest.getInstance("MD5");
-            }
-            catch (NoSuchAlgorithmException e) {
-                assert false : "Should have failed in constructor";
-
-                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
-            }
-        }
-    };
-
     /** Number of partitions. */
     private int parts;
 
@@ -121,10 +104,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /** Hash ID resolver. */
     private AffinityNodeHashResolver hashIdRslvr = null;
 
-    /** Ignite instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
     /** Logger instance. */
     @LoggerResource
     private transient IgniteLogger log;
@@ -195,13 +174,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         setPartitions(parts);
 
         this.backupFilter = backupFilter;
-
-        try {
-            MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e) {
-            throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
-        }
     }
 
     /**
@@ -382,116 +354,94 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /**
      * Returns collection of nodes (primary first) for specified partition.
      *
-     * @param d Message digest.
      * @param part Partition.
      * @param nodes Nodes.
-     * @param nodesHash Serialized nodes hashes.
      * @param backups Number of backups.
      * @param neighborhoodCache Neighborhood.
      * @return Assignment.
      */
-    public List<ClusterNode> assignPartition(MessageDigest d,
-        int part,
+    public List<ClusterNode> assignPartition(int part,
         List<ClusterNode> nodes,
-        Map<ClusterNode, byte[]> nodesHash,
         int backups,
         @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
         if (nodes.size() <= 1)
             return nodes;
 
-        if (d == null)
-            d = digest.get();
-
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
-
-        try {
-            for (int i = 0; i < nodes.size(); i++) {
-                ClusterNode node = nodes.get(i);
+        IgniteBiTuple<Long, ClusterNode> [] hashArr =
+            (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()];
 
-                byte[] nodeHashBytes = nodesHash.get(node);
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
 
-                if (nodeHashBytes == null) {
-                    Object nodeHash = resolveNodeHash(node);
+            Object nodeHash = resolveNodeHash(node);
 
-                    byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+            long hash = hash(nodeHash.hashCode(), part);
 
-                    // Add 4 bytes for partition bytes.
-                    nodeHashBytes = new byte[nodeHashBytes0.length + 4];
-
-                    System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
-
-                    nodesHash.put(node, nodeHashBytes);
-                }
-
-                U.intToBytes(part, nodeHashBytes, 0);
-
-                d.reset();
-
-                byte[] bytes = d.digest(nodeHashBytes);
+            hashArr[i] = F.t(hash, node);
+        }
 
-                long hash =
-                    (bytes[0] & 0xFFL)
-                        | ((bytes[1] & 0xFFL) << 8)
-                        | ((bytes[2] & 0xFFL) << 16)
-                        | ((bytes[3] & 0xFFL) << 24)
-                        | ((bytes[4] & 0xFFL) << 32)
-                        | ((bytes[5] & 0xFFL) << 40)
-                        | ((bytes[6] & 0xFFL) << 48)
-                        | ((bytes[7] & 0xFFL) << 56);
+        final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
 
-                lst.add(F.t(hash, node));
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
+        Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups);
 
-        Collections.sort(lst, COMPARATOR);
+        // REPLICATED cache case
+        if (backups == Integer.MAX_VALUE)
+            return replicatedAssign(nodes, sortedNodes);
 
-        int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+        Iterator<ClusterNode> it = sortedNodes.iterator();
 
         List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
 
-        ClusterNode primary = lst.get(0).get2();
+        Collection<ClusterNode> allNeighbors = new HashSet<>();
+
+        ClusterNode primary = it.next();
 
         res.add(primary);
 
+        if (exclNeighbors)
+            allNeighbors.addAll(neighborhoodCache.get(primary.id()));
+
         // Select backups.
         if (backups > 0) {
-            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
-
-                ClusterNode node = next.get2();
+            while (it.hasNext() && res.size() < primaryAndBackups) {
+                ClusterNode node = it.next();
 
                 if (exclNeighbors) {
-                    Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
-
-                    if (!allNeighbors.contains(node))
+                    if (!allNeighbors.contains(node)) {
                         res.add(node);
+
+                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                    }
+                }
+                else if ((backupFilter != null && backupFilter.apply(primary, node))
+                    || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+                    || (affinityBackupFilter == null && backupFilter == null) ) {
+                    res.add(node);
+
+                    if (exclNeighbors)
+                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
                 }
-                else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
-                    res.add(next.get2());
-                else if (backupFilter != null && backupFilter.apply(primary, node))
-                    res.add(next.get2());
-                else if (affinityBackupFilter == null && backupFilter == null)
-                    res.add(next.get2());
             }
         }
 
         if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
             // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
-            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+            it = sortedNodes.iterator();
+
+            it.next();
 
-                ClusterNode node = next.get2();
+            while (it.hasNext() && res.size() < primaryAndBackups) {
+                ClusterNode node = it.next();
 
                 if (!res.contains(node))
-                    res.add(next.get2());
+                    res.add(node);
             }
 
             if (!exclNeighborsWarn) {
                 LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
-                    "because topology has no enough nodes to assign backups.");
+                        "because topology has no enough nodes to assign backups.",
+                    "Affinity function excludeNeighbors property is ignored " +
+                        "because topology has no enough nodes to assign backups.");
 
                 exclNeighborsWarn = true;
             }
@@ -502,6 +452,53 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         return res;
     }
 
+    /**
+     * Creates assignment for REPLICATED cache
+     *
+     * @param nodes Topology.
+     * @param sortedNodes Sorted for specified partitions nodes.
+     * @return Assignment.
+     */
+    private List<ClusterNode> replicatedAssign(List<ClusterNode> nodes, Iterable<ClusterNode> sortedNodes) {
+        ClusterNode primary = sortedNodes.iterator().next();
+
+        List<ClusterNode> res = new ArrayList<>(nodes.size());
+
+        res.add(primary);
+
+        for (ClusterNode n : nodes)
+            if (!n.equals(primary))
+                res.add(n);
+
+        assert res.size() == nodes.size() : "Not enough backups: " + res.size();
+
+        return res;
+    }
+
+    /**
+     * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins
+     * hash.
+     *
+     * @param key0 Hash key.
+     * @param key1 Hash key.
+     * @see <a href="https://gist.github.com/badboy/6267743#64-bit-mix-functions">64 bit mix functions</a>
+     * @return Long hash key.
+     */
+    private static long hash(int key0, int key1) {
+        long key = (key0 & 0xFFFFFFFFL)
+            | ((key1 & 0xFFFFFFFFL) << 32);
+
+        key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+        key ^= (key >>> 24);
+        key += (key << 3) + (key << 8); // key * 265
+        key ^= (key >>> 14);
+        key += (key << 2) + (key << 4); // key * 21
+        key ^= (key >>> 28);
+        key += (key << 31);
+
+        return key;
+    }
+
     /** {@inheritDoc} */
     @Override public void reset() {
         // No-op.
@@ -534,19 +531,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
-        MessageDigest d = digest.get();
-
         List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
 
-        Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
-
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(d,
-                i,
-                nodes,
-                nodesHash,
-                affCtx.backups(),
-                neighborhoodCache);
+            List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache);
 
             assignments.add(partAssignment);
         }
@@ -590,4 +578,83 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
                 o1.get2().id().compareTo(o2.get2().id());
         }
     }
+
+    /**
+     * Sorts the initial array with linear sort algorithm array
+     */
+    private static class LazyLinearSortedContainer implements Iterable<ClusterNode> {
+        /** Initial node-hash array. */
+        private final IgniteBiTuple<Long, ClusterNode>[] arr;
+
+        /** Count of the sorted elements */
+        private int sorted;
+
+        /**
+         * @param arr Node / partition hash list.
+         * @param needFirstSortedCnt Estimate count of elements to return by iterator.
+         */
+        LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) {
+            this.arr = arr;
+
+            if (needFirstSortedCnt > (int)Math.log(arr.length)) {
+                Arrays.sort(arr, COMPARATOR);
+
+                sorted = arr.length;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<ClusterNode> iterator() {
+            return new SortIterator();
+        }
+
+        /**
+         *
+         */
+        private class SortIterator implements Iterator<ClusterNode> {
+            /** Index of the first unsorted element. */
+            private int cur;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return cur < arr.length;
+            }
+
+            /** {@inheritDoc} */
+            @Override public ClusterNode next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                if (cur < sorted)
+                    return arr[cur++].get2();
+
+                IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+
+                int minIdx = cur;
+
+                for (int i = cur + 1; i < arr.length; i++) {
+                    if (COMPARATOR.compare(arr[i], min) < 0) {
+                        minIdx = i;
+
+                        min = arr[i];
+                    }
+                }
+
+                if (minIdx != cur) {
+                    arr[minIdx] = arr[cur];
+
+                    arr[cur] = min;
+                }
+
+                sorted = cur++;
+
+                return min.get2();
+            }
+
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                throw new UnsupportedOperationException("Remove doesn't supported");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1297c38..9350b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -781,17 +781,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
 
-        Collection<ClusterNode> rmts;
-
         // If this is the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
+            // Check rebalance state & send CacheAffinityChangeMessage if need.
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (!cacheCtx.isLocal()) {
+                    if (cacheCtx == null)
+                        continue;
+
+                    GridDhtPartitionTopology top = null;
+
+                    if (!cacheCtx.isLocal())
+                        top = cacheCtx.topology();
+
+                    if (top != null)
+                        cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId());
+                }
+            }
+
             GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut;
 
             // No need to send to nodes which did not finish their first exchange.
             AffinityTopologyVersion rmtTopVer =
                 lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
 
-            rmts = CU.remoteNodes(cctx, rmtTopVer);
+            Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
 
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
index 683ffa2..dfebdbd 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java
@@ -17,34 +17,17 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
 import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.testframework.GridTestUtils;
 
 /**
  * Tests for {@link RendezvousAffinityFunction}.
  */
 public class RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest extends AbstractAffinityFunctionSelfTest {
-    /** Ignite. */
-    private static Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        ignite = startGrid();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
     /** {@inheritDoc} */
     @Override protected AffinityFunction affinityFunction() {
         AffinityFunction aff = new RendezvousAffinityFunction(512, null);
 
-        GridTestUtils.setFieldValue(aff, "ignite", ignite);
-
         return aff;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
new file mode 100644
index 0000000..3e5bae9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java
@@ -0,0 +1,1100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.io.Externalizable;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityNodeHashResolver;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Simple benchmarks, compatibility test and distribution check utils for affinity functions.
+ * Needs to check changes at the {@link RendezvousAffinityFunction}.
+ */
+public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest {
+    /** MAC prefix. */
+    private static final String MAC_PREF = "MAC";
+
+    /** Ignite. */
+    private static Ignite ignite;
+
+    /** Max experiments. */
+    private static final int MAX_EXPERIMENTS = 200;
+
+    /** Max experiments. */
+    private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 3600 * 1000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite = startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param nodesCnt Count of nodes to generate.
+     * @return Nodes list.
+     */
+    private List<ClusterNode> createBaseNodes(int nodesCnt) {
+        List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+        for (int i = 0; i < nodesCnt; i++) {
+            GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+            // two neighbours nodes
+            node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2);
+
+            nodes.add(node);
+        }
+        return nodes;
+    }
+
+    /**
+     * Modify the topology by remove the last / add new node.
+     *
+     * @param nodes Topology.
+     * @param prevAssignment Previous afinity.
+     * @param iter Number of iteration.
+     * @param backups Backups count.
+     * @return Affinity context.
+     */
+    private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes,
+        List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+        DiscoveryEvent discoEvt;
+
+        discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1);
+
+        return new GridAffinityFunctionContextImpl(nodes,
+            prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+    }
+
+    /**
+     * @param nodes Topology.
+     * @param idx Index of node to remove.
+     * @return Discovery event.
+     */
+    @NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) {
+        return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx));
+    }
+
+    /**
+     * Modify the topology by remove the first node / add new node
+     *
+     * @param nodes Topology.
+     * @param prevAssignment Previous affinity.
+     * @param iter Number of iteration.
+     * @param backups Backups count.
+     * @return Affinity context.
+     */
+    private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes,
+        List<List<ClusterNode>> prevAssignment, int iter, int backups) {
+        DiscoveryEvent discoEvt;
+
+        discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0);
+
+        return new GridAffinityFunctionContextImpl(nodes,
+            prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups);
+    }
+
+    /**
+     * @param nodes Topology.
+     * @param iter Iteration count.
+     * @return Discovery event.
+     */
+    @NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) {
+        GridTestNode node = new GridTestNode(UUID.randomUUID());
+
+        // two neighbours nodes
+        node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4);
+
+        nodes.add(node);
+
+        return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node);
+    }
+
+    /**
+     *
+     * @param aff Affinity function.
+     * @param nodes Topology.
+     * @param iter Number of iteration.
+     * @param prevAssignment Previous affinity assignment.
+     * @param backups Backups count.
+     * @return Tuple with affinity and time spend of the affinity calculation.
+     */
+    private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff,
+        List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) {
+
+        GridAffinityFunctionContextImpl ctx = null;
+        switch (mode) {
+            case CHANGE_LAST_NODE:
+                ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups);
+                break;
+            case CHANGE_FIRST_NODE:
+                ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups);
+                break;
+
+            case ADD:
+                ctx = new GridAffinityFunctionContextImpl(nodes,
+                    prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups);
+                break;
+
+            case REMOVE_RANDOM:
+                ctx = new GridAffinityFunctionContextImpl(nodes,
+                    prevAssignment, removeNode(nodes, nodes.size() - 1),
+                    new AffinityTopologyVersion(nodes.size()), backups);
+                break;
+
+            case NONE:
+                ctx = new GridAffinityFunctionContextImpl(nodes,
+                    prevAssignment,
+                    new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)),
+                    new AffinityTopologyVersion(nodes.size()), backups);
+                break;
+
+        }
+
+        long start = System.currentTimeMillis();
+
+        List<List<ClusterNode>> assignments = aff.assignPartitions(ctx);
+
+        return F.t(System.currentTimeMillis() - start, assignments);
+    }
+
+    /**
+     * @param lst List pf measures.
+     * @return Average of measures.
+     */
+    private double average(Collection<Long> lst) {
+        if (lst.isEmpty())
+            return 0;
+
+        long sum = 0;
+
+        for (long l : lst)
+            sum += l;
+
+        return (double)sum / lst.size();
+    }
+
+    /**
+     * @param lst List pf measures.
+     * @param avg Average of the measures.
+     * @return Variance of the measures.
+     */
+    private double variance(Collection<Long> lst, double avg) {
+        if (lst.isEmpty())
+            return 0;
+
+        long sum = 0;
+
+        for (long l : lst)
+            sum += (l - avg) * (l - avg);
+
+        return Math.sqrt((double)sum / lst.size());
+    }
+
+    /**
+     * The table with count of partitions on node:
+     *
+     * column 0 - primary partitions counts
+     * column 1 - backup#0 partitions counts
+     * etc
+     *
+     * Rows correspond to the nodes.
+     *
+     * @param lst Affinity result.
+     * @param nodes Topology.
+     * @return Frequency distribution: counts of partitions on node.
+     */
+    private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) {
+        List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>();
+
+        int backups = lst.get(0).size();
+
+        for (int i = 0; i < backups; ++i) {
+            Map<ClusterNode, AtomicInteger> map = new HashMap<>();
+
+            for (List<ClusterNode> l : lst) {
+                ClusterNode node = l.get(i);
+
+                if (!map.containsKey(node))
+                    map.put(node, new AtomicInteger(1));
+                else
+                    map.get(node).incrementAndGet();
+            }
+
+            nodeMaps.add(map);
+        }
+
+        List<List<Integer>> byNodes = new ArrayList<>(nodes.size());
+        for (ClusterNode node : nodes) {
+            List<Integer> byBackups = new ArrayList<>(backups);
+
+            for (int j = 0; j < backups; ++j) {
+                if (nodeMaps.get(j).get(node) == null)
+                    byBackups.add(0);
+                else
+                    byBackups.add(nodeMaps.get(j).get(node).get());
+            }
+
+            byNodes.add(byBackups);
+        }
+        return byNodes;
+    }
+
+    /**
+     * @param byNodes Frequency distribution.
+     * @param suffix Label suffix.
+     * @throws IOException On error.
+     */
+    private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException {
+        int nodes = byNodes.size();
+
+        try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault()
+            .getPath(String.format("%03d", nodes) + suffix)))) {
+
+            for (List<Integer> byNode : byNodes) {
+                for (int w : byNode)
+                    ps.print(String.format("%05d ", w));
+
+                ps.println("");
+            }
+        }
+    }
+
+    /**
+     * Chi-square test of the distribution with uniform distribution.
+     *
+     * @param byNodes Distribution.
+     * @param parts Partitions count.
+     * @param goldenNodeWeight Weight of according the uniform distribution.
+     * @return Chi-square test.
+     */
+    private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) {
+        double sum = 0;
+
+        for (List<Integer> byNode : byNodes) {
+            double w = (double)byNode.get(0) / parts;
+
+            sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight;
+        }
+        return sum;
+    }
+
+    /**
+     * @throws IOException On error.
+     */
+    public void testDistribution() throws IOException {
+        AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+        AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+
+        GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+        affinityDistribution(aff0, aff1);
+    }
+
+    /**
+     *
+     * @param aff0 Affinity function to compare.
+     * @param aff1 Affinity function to compare.
+     */
+    private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) {
+        int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600};
+
+        for (int nodesCnt : nodesCnts) {
+            List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+            List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+            assignPartitions(aff0, nodes0, null, 2, 0).get2();
+            List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2();
+
+            assignPartitions(aff1, nodes1, null, 2, 0).get2();
+            List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2();
+
+            List<List<Integer>> dist0 = freqDistribution(lst0, nodes0);
+            List<List<Integer>> dist1 = freqDistribution(lst1, nodes1);
+
+            info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;",
+                nodesCnt,
+                aff0.getClass().getSimpleName(),
+                chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt),
+                aff1.getClass().getSimpleName(),
+                chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt)));
+
+            try {
+                printDistribution(dist0, "." + aff0.getClass().getSimpleName());
+                printDistribution(dist1, "." + aff1.getClass().getSimpleName());
+            }
+            catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public void testAffinityBenchmarkAdd() {
+        mode = TopologyModificationMode.ADD;
+
+        AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+        GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+        affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+    }
+
+    /**
+     *
+     */
+    public void testAffinityBenchmarkChangeLast() {
+        mode = TopologyModificationMode.CHANGE_LAST_NODE;
+
+        AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024);
+
+        GridTestUtils.setFieldValue(aff0, "ignite", ignite);
+
+        affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024));
+    }
+
+    /**
+     * @param aff0 Affinity function. to compare.
+     * @param aff1 Affinity function. to compare.
+     */
+    private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) {
+        int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600};
+
+        final int backups = 2;
+
+        for (int nodesCnt : nodesCnts) {
+            List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+            List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+            List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS);
+            List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS);
+
+            List<List<ClusterNode>> prevAssignment =
+                assignPartitions(aff0, nodes0, null, backups, 0).get2();
+
+            for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+                IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+                    = assignPartitions(aff0, nodes0, prevAssignment, backups, i);
+
+                prevAssignment = aa.get2();
+
+                times0.add(aa.get1());
+            }
+
+            prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+
+            for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+                IgniteBiTuple<Long, List<List<ClusterNode>>> aa
+                    = assignPartitions(aff1, nodes1, prevAssignment, backups, i);
+
+                prevAssignment = aa.get2();
+
+                times1.add(aa.get1());
+            }
+
+            double avr0 = average(times0);
+            double var0 = variance(times0, avr0);
+
+            double avr1 = average(times1);
+            double var1 = variance(times1, avr1);
+
+            info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;",
+                nodesCnt,
+                aff0.getClass().getSimpleName(),
+                avr0, var0,
+                aff1.getClass().getSimpleName(),
+                avr1, var1));
+        }
+    }
+
+    /**
+     *
+     * @param affOld Old affinity.
+     * @param affNew New affinity/
+     * @return Count of partitions to migrate.
+     */
+    private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) {
+        if (affOld == null || affNew == null)
+            return 0;
+
+        assertEquals(affOld.size(), affNew.size());
+
+        int diff = 0;
+        for (int i = 0; i < affOld.size(); ++i) {
+            Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i));
+            Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i));
+
+            for (ClusterNode n : s1) {
+                if (!s0.contains(n))
+                    ++diff;
+            }
+        }
+
+        return diff;
+    }
+
+    /**
+     *
+     */
+    public void testPartitionsMigrate() {
+        int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600};
+
+        final int backups = 2;
+
+        AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256);
+        AffinityFunction aff1 = new FairAffinityFunction(true, 256);
+
+        for (int nodesCnt : nodesCnts) {
+            List<ClusterNode> nodes0 = createBaseNodes(nodesCnt);
+            List<ClusterNode> nodes1 = createBaseNodes(nodesCnt);
+
+            List<List<ClusterNode>> affPrev = null;
+
+            int diffCnt0 = 0;
+
+            affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2();
+            for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+                List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2();
+                diffCnt0 += countPartitionsToMigrate(affPrev, affCur);
+                affPrev = affCur;
+            }
+
+            affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2();
+            int diffCnt1 = 0;
+            for (int i = 0; i < MAX_EXPERIMENTS; ++i) {
+                List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2();
+                diffCnt1 += countPartitionsToMigrate(affPrev, affCur);
+                affPrev = affCur;
+            }
+
+            double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1);
+            info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;",
+                nodesCnt, goldenChangeAffinity,
+                aff0.getClass().getSimpleName(),
+                (double)diffCnt0 / (MAX_EXPERIMENTS - 1),
+                aff1.getClass().getSimpleName(),
+                (double)diffCnt1 / (MAX_EXPERIMENTS - 1)));
+        }
+    }
+
+    /**
+     *
+     */
+    public void _testAffinityCompatibility() {
+        mode = TopologyModificationMode.ADD;
+
+        AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024);
+
+        // Use the full copy of the old implementaion of the RendezvousAffinityFunction to check the compatibility.
+        AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024);
+        GridTestUtils.setFieldValue(aff1, "ignite", ignite);
+
+        affinityCompatibility(aff0, aff1);
+    }
+
+    /**
+     * @param aff0 Affinity function to compare.
+     * @param aff1 Affinity function to compare.
+     */
+    private void affinityCompatibility(AffinityFunction aff0, AffinityFunction aff1) {
+        int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600};
+
+        final int backups = 2;
+
+        mode = TopologyModificationMode.NONE;
+
+        for (int nodesCnt : nodesCnts) {
+            List<ClusterNode> nodes = createBaseNodes(nodesCnt);
+
+            List<List<ClusterNode>> assignment0 = assignPartitions(aff0, nodes, null, backups, 0).get2();
+
+            List<List<ClusterNode>> assignment1 = assignPartitions(aff1, nodes, null, backups, 0).get2();
+
+            assertEquals (assignment0, assignment1);
+        }
+    }
+
+    /**
+     *
+     */
+    private enum TopologyModificationMode {
+        /** Change the last node. */
+        CHANGE_LAST_NODE,
+
+        /** Change the first node. */
+        CHANGE_FIRST_NODE,
+
+        /** Add. */
+        ADD,
+
+        /** Remove random. */
+        REMOVE_RANDOM,
+
+        /** Do nothing*/
+        NONE
+    }
+
+    /**
+     *  Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance.
+     */
+    private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Default number of partitions. */
+        public static final int DFLT_PARTITION_COUNT = 1024;
+
+        /** Comparator. */
+        private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
+
+        /** Thread local message digest. */
+        private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
+            @Override protected MessageDigest initialValue() {
+                try {
+                    return MessageDigest.getInstance("MD5");
+                }
+                catch (NoSuchAlgorithmException e) {
+                    assert false : "Should have failed in constructor";
+
+                    throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
+                }
+            }
+        };
+
+        /** Number of partitions. */
+        private int parts;
+
+        /** Exclude neighbors flag. */
+        private boolean exclNeighbors;
+
+        /** Exclude neighbors warning. */
+        private transient boolean exclNeighborsWarn;
+
+        /** Optional backup filter. First node is primary, second node is a node being tested. */
+        private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
+
+        /** Optional affinity backups filter. The first node is a node being tested,
+         *  the second is a list of nodes that are already assigned for a given partition (the first node in the list
+         *  is primary). */
+        private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter;
+
+        /** Hash ID resolver. */
+        private AffinityNodeHashResolver hashIdRslvr = null;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Logger instance. */
+        @LoggerResource
+        private transient IgniteLogger log;
+
+        /**
+         * Empty constructor with all defaults.
+         */
+        public RendezvousAffinityFunctionOld() {
+            this(false);
+        }
+
+        /**
+         * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+         * and specified number of backups.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+         *      of each other.
+         */
+        public RendezvousAffinityFunctionOld(boolean exclNeighbors) {
+            this(exclNeighbors, DFLT_PARTITION_COUNT);
+        }
+
+        /**
+         * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+         * and specified number of backups and partitions.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+         *      of each other.
+         * @param parts Total number of partitions.
+         */
+        public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) {
+            this(exclNeighbors, parts, null);
+        }
+
+        /**
+         * Initializes optional counts for replicas and backups.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param parts Total number of partitions.
+         * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+         *      from all nodes that pass this filter. First argument for this filter is primary node, and second
+         *      argument is node being tested.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         */
+        public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+            this(false, parts, backupFilter);
+        }
+
+        /**
+         * Private constructor.
+         *
+         * @param exclNeighbors Exclude neighbors flag.
+         * @param parts Partitions count.
+         * @param backupFilter Backup filter.
+         */
+        private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts,
+            IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+            A.ensure(parts > 0, "parts > 0");
+
+            this.exclNeighbors = exclNeighbors;
+            this.parts = parts;
+            this.backupFilter = backupFilter;
+
+            try {
+                MessageDigest.getInstance("MD5");
+            }
+            catch (NoSuchAlgorithmException e) {
+                throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
+            }
+        }
+
+        /**
+         * Gets total number of key partitions. To ensure that all partitions are
+         * equally distributed across all nodes, please make sure that this
+         * number is significantly larger than a number of nodes. Also, partition
+         * size should be relatively small. Try to avoid having partitions with more
+         * than quarter million keys.
+         * <p>
+         * Note that for fully replicated caches this method should always
+         * return {@code 1}.
+         *
+         * @return Total partition count.
+         */
+        public int getPartitions() {
+            return parts;
+        }
+
+        /**
+         * Sets total number of partitions.
+         *
+         * @param parts Total number of partitions.
+         */
+        public void setPartitions(int parts) {
+            A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
+            this.parts = parts;
+        }
+
+        /**
+         * Gets hash ID resolver for nodes. This resolver is used to provide
+         * alternate hash ID, other than node ID.
+         * <p>
+         * Node IDs constantly change when nodes get restarted, which causes them to
+         * be placed on different locations in the hash ring, and hence causing
+         * repartitioning. Providing an alternate hash ID, which survives node restarts,
+         * puts node on the same location on the hash ring, hence minimizing required
+         * repartitioning.
+         *
+         * @return Hash ID resolver.
+         */
+        @Deprecated
+        public AffinityNodeHashResolver getHashIdResolver() {
+            return hashIdRslvr;
+        }
+
+        /**
+         * Sets hash ID resolver for nodes. This resolver is used to provide
+         * alternate hash ID, other than node ID.
+         * <p>
+         * Node IDs constantly change when nodes get restarted, which causes them to
+         * be placed on different locations in the hash ring, and hence causing
+         * repartitioning. Providing an alternate hash ID, which survives node restarts,
+         * puts node on the same location on the hash ring, hence minimizing required
+         * repartitioning.
+         *
+         * @param hashIdRslvr Hash ID resolver.
+         *
+         * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead.
+         */
+        @Deprecated
+        public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) {
+            this.hashIdRslvr = hashIdRslvr;
+        }
+
+        /**
+         * Gets optional backup filter. If not {@code null}, backups will be selected
+         * from all nodes that pass this filter. First node passed to this filter is primary node,
+         * and second node is a node being tested.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @return Optional backup filter.
+         */
+        @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+            return backupFilter;
+        }
+
+        /**
+         * Sets optional backup filter. If provided, then backups will be selected from all
+         * nodes that pass this filter. First node being passed to this filter is primary node,
+         * and second node is a node being tested.
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param backupFilter Optional backup filter.
+         * @deprecated Use {@code affinityBackupFilter} instead.
+         */
+        @Deprecated
+        public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+            this.backupFilter = backupFilter;
+        }
+
+        /**
+         * Gets optional backup filter. If not {@code null}, backups will be selected
+         * from all nodes that pass this filter. First node passed to this filter is a node being tested,
+         * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+         * the first in the list).
+         * <p>
+         * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @return Optional backup filter.
+         */
+        @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() {
+            return affinityBackupFilter;
+        }
+
+        /**
+         * Sets optional backup filter. If provided, then backups will be selected from all
+         * nodes that pass this filter. First node being passed to this filter is a node being tested,
+         * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is
+         * the first in the list).
+         * <p>
+         * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param affinityBackupFilter Optional backup filter.
+         */
+        public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode,
+            List<ClusterNode>> affinityBackupFilter) {
+            this.affinityBackupFilter = affinityBackupFilter;
+        }
+
+        /**
+         * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+         */
+        public boolean isExcludeNeighbors() {
+            return exclNeighbors;
+        }
+
+        /**
+         * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+         * <p>
+         * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+         *
+         * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+         */
+        public void setExcludeNeighbors(boolean exclNeighbors) {
+            this.exclNeighbors = exclNeighbors;
+        }
+
+        /**
+         * Resolves node hash.
+         *
+         * @param node Cluster node;
+         * @return Node hash.
+         */
+        public Object resolveNodeHash(ClusterNode node) {
+            if (hashIdRslvr != null)
+                return hashIdRslvr.resolve(node);
+            else
+                return node.consistentId();
+        }
+
+        /**
+         * Returns collection of nodes (primary first) for specified partition.
+         *
+         * @param d Message digest.
+         * @param part Partition.
+         * @param nodes Nodes.
+         * @param nodesHash Serialized nodes hashes.
+         * @param backups Number of backups.
+         * @param neighborhoodCache Neighborhood.
+         * @return Assignment.
+         */
+        public List<ClusterNode> assignPartition(MessageDigest d,
+            int part,
+            List<ClusterNode> nodes,
+            Map<ClusterNode, byte[]> nodesHash,
+            int backups,
+            @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
+            if (nodes.size() <= 1)
+                return nodes;
+
+            if (d == null)
+                d = digest.get();
+
+            List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
+
+            try {
+                for (int i = 0; i < nodes.size(); i++) {
+                    ClusterNode node = nodes.get(i);
+
+                    byte[] nodeHashBytes = nodesHash.get(node);
+
+                    if (nodeHashBytes == null) {
+                        Object nodeHash = resolveNodeHash(node);
+
+                        byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+                        // Add 4 bytes for partition bytes.
+                        nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+                        System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+                        nodesHash.put(node, nodeHashBytes);
+                    }
+
+                    U.intToBytes(part, nodeHashBytes, 0);
+
+                    d.reset();
+
+                    byte[] bytes = d.digest(nodeHashBytes);
+
+                    long hash =
+                        (bytes[0] & 0xFFL)
+                            | ((bytes[1] & 0xFFL) << 8)
+                            | ((bytes[2] & 0xFFL) << 16)
+                            | ((bytes[3] & 0xFFL) << 24)
+                            | ((bytes[4] & 0xFFL) << 32)
+                            | ((bytes[5] & 0xFFL) << 40)
+                            | ((bytes[6] & 0xFFL) << 48)
+                            | ((bytes[7] & 0xFFL) << 56);
+
+                    lst.add(F.t(hash, node));
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+
+            Collections.sort(lst, COMPARATOR);
+
+            int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+
+            List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
+
+            ClusterNode primary = lst.get(0).get2();
+
+            res.add(primary);
+
+            // Select backups.
+            if (backups > 0) {
+                for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+                    IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+                    ClusterNode node = next.get2();
+
+                    if (exclNeighbors) {
+                        Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
+
+                        if (!allNeighbors.contains(node))
+                            res.add(node);
+                    }
+                    else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+                        res.add(next.get2());
+                    else if (backupFilter != null && backupFilter.apply(primary, node))
+                        res.add(next.get2());
+                    else if (affinityBackupFilter == null && backupFilter == null)
+                        res.add(next.get2());
+                }
+            }
+
+            if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
+                // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
+                for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
+                    IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+
+                    ClusterNode node = next.get2();
+
+                    if (!res.contains(node))
+                        res.add(next.get2());
+                }
+
+                if (!exclNeighborsWarn) {
+                    LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
+                        "because topology has no enough nodes to assign backups.");
+
+                    exclNeighborsWarn = true;
+                }
+            }
+
+            assert res.size() <= primaryAndBackups;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            if (key == null)
+                throw new IllegalArgumentException("Null key is passed for a partition calculation. " +
+                    "Make sure that an affinity key that is used is initialized properly.");
+
+            return U.safeAbs(key.hashCode() % parts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<List<ClusterNode>> assignments = new ArrayList<>(parts);
+
+            Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+                GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+
+            MessageDigest d = digest.get();
+
+            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+            Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
+            for (int i = 0; i < parts; i++) {
+                List<ClusterNode> partAssignment = assignPartition(d,
+                    i,
+                    nodes,
+                    nodesHash,
+                    affCtx.backups(),
+                    neighborhoodCache);
+
+                assignments.add(partAssignment);
+            }
+
+            return assignments;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(parts);
+            out.writeBoolean(exclNeighbors);
+            out.writeObject(hashIdRslvr);
+            out.writeObject(backupFilter);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            parts = in.readInt();
+            exclNeighbors = in.readBoolean();
+            hashIdRslvr = (AffinityNodeHashResolver)in.readObject();
+            backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject();
+        }
+
+        /**
+         *
+         */
+        private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+            /** */
+            private static final long serialVersionUID = 0L;
+
+            /** {@inheritDoc} */
+            @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+                return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+                    o1.get2().id().compareTo(o2.get2().id());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
index ed47c57..cffa277 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java
@@ -17,34 +17,17 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
 import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.testframework.GridTestUtils;
 
 /**
  * Tests for {@link RendezvousAffinityFunction}.
  */
 public class RendezvousAffinityFunctionStandardHashSelfTest extends AbstractAffinityFunctionSelfTest {
-    /** Ignite. */
-    private static Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        ignite = startGrid();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
     /** {@inheritDoc} */
     @Override protected AffinityFunction affinityFunction() {
         AffinityFunction aff = new RendezvousAffinityFunction(513, null);
 
-        GridTestUtils.setFieldValue(aff, "ignite", ignite);
-
         return aff;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 01aa256..dff827d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -101,6 +101,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
     private static final String STATIC_CACHE = "static-cache";
 
     /** */
+    private static final int CACHE_PUTS_CNT = 3;
+
+    /** */
     private UUID nodeId;
 
     /** {@inheritDoc} */
@@ -580,17 +583,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
             new CI1<IgniteCache<Object, Object>>() {
                 @Override public void apply(IgniteCache<Object, Object> cache) {
                     try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) {
-                        log.info("Put1: " + key);
-
-                        cache.put(key, key);
-
-                        Integer key2 = key + 1;
-
-                        log.info("Put2: " + key2);
-
-                        cache.put(key2, key2);
-
-                        log.info("Commit [key1=" + key + ", key2=" + key2 + ']');
+                        for (int i = 0; i < CACHE_PUTS_CNT; ++i)
+                            cache.put(key + i, key + i);
 
                         tx.commit();
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
index ed473d8..6cac96c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java
@@ -114,6 +114,8 @@ public class BinaryEnumsSelfTest extends GridCommonAbstractTest {
         node2 = startGrid(1);
         cache2 = node2.cache(CACHE_NAME);
         cacheBinary2 = cache2.withKeepBinary();
+
+        awaitPartitionMapExchange();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
deleted file mode 100644
index 2d46cf4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.GridTestNode;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTest {
-    /** */
-    public static final int NODES_CNT = 50;
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionSpreading() throws Exception {
-        System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", "Max", "Dev");
-
-        for (int i = 5; i < NODES_CNT; i = i * 3 / 2) {
-            for (int replicas = 128; replicas <= 4096; replicas*=2) {
-                Collection<ClusterNode> nodes = createNodes(i, replicas);
-
-                RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 10000);
-
-                checkDistribution(aff, nodes);
-            }
-
-            System.out.println();
-        }
-    }
-
-    /**
-     * @param nodesCnt Nodes count.
-     * @param replicas Value of
-     * @return Collection of test nodes.
-     */
-    private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) {
-        Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt);
-
-        for (int i = 0; i < nodesCnt; i++)
-            nodes.add(new TestRichNode(replicas));
-
-        return nodes;
-    }
-
-    /**
-     * @param aff Affinity to check.
-     * @param nodes Collection of nodes to test on.
-     */
-    private void checkDistribution(RendezvousAffinityFunction aff, Collection<ClusterNode> nodes) {
-        Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
-
-        for (int part = 0; part < aff.getPartitions(); part++) {
-            Collection<ClusterNode> affNodes = aff.assignPartition(null,
-                part,
-                new ArrayList<>(nodes),
-                new HashMap<ClusterNode, byte[]>(),
-                0,
-                null);
-
-            assertEquals(1, affNodes.size());
-
-            ClusterNode node = F.first(affNodes);
-
-            parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1);
-        }
-
-        int min = Integer.MAX_VALUE;
-        int max = Integer.MIN_VALUE;
-        int total = 0;
-
-        float mean = 0;
-        float m2 = 0;
-        int n = 0;
-
-        for (ClusterNode node : nodes) {
-            int partsCnt = parts.get(node) != null ? parts.get(node) : 0;
-
-            total += partsCnt;
-
-            if (partsCnt < min)
-                min = partsCnt;
-
-            if (partsCnt > max)
-                max = partsCnt;
-
-            n++;
-            float delta = partsCnt - mean;
-            mean += delta / n;
-            m2 += delta * (partsCnt - mean);
-        }
-
-        m2 /= (n - 1);
-        assertEquals(aff.getPartitions(), total);
-
-        System.out.printf("%6s, %6s, %6s, %8.4f\n", nodes.size(),min, max, Math.sqrt(m2));
-    }
-
-    /**
-     * Rich node stub to use in emulated server topology.
-     */
-    private static class TestRichNode extends GridTestNode {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final int replicas;
-
-        /**
-         * Externalizable class requires public no-arg constructor.
-         */
-        @SuppressWarnings("UnusedDeclaration")
-        private TestRichNode(int replicas) {
-            this(UUID.randomUUID(), replicas);
-        }
-
-        /**
-         * Constructs rich node stub to use in emulated server topology.
-         *
-         * @param nodeId Node id.
-         */
-        private TestRichNode(UUID nodeId, int replicas) {
-            this.nodeId = nodeId;
-            this.replicas = replicas;
-        }
-
-        /**
-         * Unused constructor for externalizable support.
-         */
-        public TestRichNode() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public UUID id() {
-            return nodeId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T> T attribute(String name) {
-            return super.attribute(name);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index db11291..c17e9f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
      * @throws Exception If failed.
      */
     public void testQueue() throws Exception {
-        final String queueName = "q";
+        final String queueName = "qq";
 
         System.out.println(U.filler(20, '\n'));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index 9e79a27..c8568d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -102,6 +102,8 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
         startGrid(1);
         startGrid(2);
 
+        awaitPartitionMapExchange();
+
         final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener();
 
         ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
index f1c791e..47d54d5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
@@ -71,13 +71,13 @@ public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest {
 
         Ignite ignite1 = startGrid(1);
 
+        awaitPartitionMapExchange();
+
         // This key should become primary for ignite1.
-        final Integer key = ignite0.configuration().getMarshaller() instanceof OptimizedMarshaller ? 2 : 7;
+        final Integer key = primaryKey(ignite1.cache(null));
 
         assertNull(cache.getAndPut(key, key));
 
-        awaitPartitionMapExchange();
-
         assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
index 61ee9ea..eebafed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
@@ -52,7 +52,7 @@ public class GridCacheRebalancingPartitionDistributionTest extends GridRollingRe
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
                 .setCacheMode(CacheMode.PARTITIONED)
                 .setBackups(1)
-                .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 271))
+                .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 1024))
                 .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK)
                 .setRebalanceMode(CacheRebalanceMode.SYNC)
                 .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
index 0e33650..c0f836b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java
@@ -25,6 +25,8 @@ import java.io.Serializable;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryReader;
@@ -109,7 +111,12 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat
             @Override public void run(IgniteServices services, String svcName, TestService svc) {
                 IgniteCache<Object, Object> cache = grid(testedNodeIdx).getOrCreateCache(CACHE_NAME);
 
-                services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), "1");
+                try {
+                    services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), primaryKey(cache));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
             }
         }));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
index 026e6a6..fb6c0f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java
@@ -83,7 +83,7 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest {
 
             final String svcName = "myService";
 
-            svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key");
+            svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, primaryKey(ig.cache(cacheName)));
 
             boolean res = GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {
@@ -125,7 +125,15 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest {
 
         final String svcName = "myService";
 
-        svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key");
+        ig.createCache(ccfg);
+
+        Object key = primaryKey(ig.cache(cacheName));
+
+        ig.destroyCache(cacheName);
+
+        awaitPartitionMapExchange();
+
+        svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key);
 
         assert svcs.service(svcName) == null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index 09d4765..d331387 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -53,8 +53,8 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
     /** */
     private UUID id;
 
-    /** */
-    private Object consistentId = consistentIdCtr.incrementAndGet();
+    /** String objects as a consistent Id is closer to real case than Integer */
+    private Object consistentId = "Node_" + consistentIdCtr.incrementAndGet();
 
     /** */
     private ClusterMetrics metrics;
@@ -247,13 +247,5 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
     /** {@inheritDoc} */
     @Override public String toString() {
         return id.toString();
-//        StringBuilder buf = new StringBuilder();
-//
-//        buf.append(getClass().getSimpleName());
-//        buf.append(" [attrs=").append(attrs);
-//        buf.append(", id=").append(id);
-//        buf.append(']');
-//
-//        return buf.toString();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
index 7f59a4b..71e737f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
@@ -21,9 +21,11 @@ import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -157,10 +159,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
             affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
                 @Override public void run() {
                     for (int i = 0; i < PARTS_CNT; ++i) {
-                        grid(0).compute().affinityRun(
-                            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
-                            new Integer(i),
-                            new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+                        try {
+                            grid(0).compute().affinityRun(
+                                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                                new Integer(i),
+                                new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+                        }
+                        catch (IgniteException e) {
+                            checkException(e, ClusterTopologyException.class);
+                        }
                     }
                 }
             }, AFFINITY_THREADS_CNT, "affinity-run");
@@ -204,10 +211,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
                         if (System.currentTimeMillis() >= endTime)
                             break;
 
-                        grid(0).compute().affinityRun(
-                            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
-                            new Integer(i),
-                            new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+                        try {
+                            grid(0).compute().affinityRun(
+                                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                                new Integer(i),
+                                new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+                        }
+                        catch (IgniteException e) {
+                            checkException(e, ClusterTopologyException.class);
+                        }
                     }
                 }
             }, AFFINITY_THREADS_CNT, "affinity-run");
@@ -229,6 +241,24 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni
         }
     }
 
+
+    /**
+     *
+     * @param e Exception to check.
+     * @param exCls Expected exception cause class.
+     */
+    private void checkException(IgniteException e, Class<? extends Exception> exCls) {
+        for (Throwable t = e; t.getCause() != null; t = t.getCause()) {
+            if (t.getCause().getClass().isAssignableFrom(exCls)) {
+                log.info("Expected exception: " + e);
+
+                return;
+            }
+        }
+
+        throw e;
+    }
+
     /** */
     private static class NotReservedCacheOpAffinityRun implements IgniteRunnable {
         /** Org id. */