You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 14:49:59 UTC

[41/41] ignite git commit: ignite-1093

ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d89f1b0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d89f1b0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d89f1b0a

Branch: refs/heads/ignite-1093
Commit: d89f1b0af5ed23bc0f6e68fa9f0d377e6be41fcc
Parents: a7ddb62
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Aug 28 15:37:14 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Aug 28 15:37:14 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   1 +
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../GridCachePartitionExchangeManager.java      |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 709 ++++++++++++++++---
 .../dht/preloader/GridDhtPartitionSupplier.java | 290 +++++++-
 .../dht/preloader/GridDhtPreloader.java         |  10 +-
 ...ridCacheMassiveRebalancingAsyncSelfTest.java |  91 ---
 ...GridCacheMassiveRebalancingSyncSelfTest.java | 392 ----------
 .../GridCacheRebalancingAsyncSelfTest.java      |  85 +++
 .../GridCacheRebalancingSyncSelfTest.java       | 269 +++++++
 10 files changed, 1267 insertions(+), 586 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1db73bf..03110c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1170,6 +1170,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
         add(ATTR_USER_NAME, System.getProperty("user.name"));
         add(ATTR_GRID_NAME, gridName);
+        add(REBALANCING_VERSION, 1);
 
         add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled());
         add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 10b8df0..c04c69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -135,6 +135,9 @@ public final class IgniteNodeAttributes {
     /** Node consistent id. */
     public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id";
 
+    /** Rebalancing version id. */
+    public static final String REBALANCING_VERSION = ATTR_PREFIX + ".rebalancing.version";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 003e8db..bf77d1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -282,7 +282,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
                     @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
-                        enterBusy();
+                        if (!enterBusy())
+                            return;
 
                         try {
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0474bf9..0aa30b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -36,12 +36,17 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
+import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 
@@ -69,13 +74,17 @@ public class GridDhtPartitionDemander {
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock;
+
     /**
      * @param cctx Cache context.
      */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
         assert cctx != null;
 
         this.cctx = cctx;
+        this.demandLock = demandLock;
 
         log = cctx.logger(getClass());
 
@@ -199,7 +208,13 @@ public class GridDhtPartitionDemander {
             else
                 fut.init(assigns);
 
-            if (assigns.isEmpty() || topologyChanged(topVer)) {
+            if (assigns.isEmpty()) {
+                fut.onDone();
+
+                return;
+            }
+
+            if (topologyChanged(topVer)) {
                 fut.onCancel();
 
                 return;
@@ -225,13 +240,17 @@ public class GridDhtPartitionDemander {
                             }
                         }
                         catch (IgniteInterruptedCheckedException ignored) {
-                            if (log.isDebugEnabled())
+                            if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ']');
+                                cSF.onCancel();
 
-                            return;
+                                return;
+                            }
                         }
                         catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
                             throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
                         }
                     }
@@ -257,18 +276,22 @@ public class GridDhtPartitionDemander {
                             }
                         }
                         catch (IgniteInterruptedCheckedException ignored) {
-                            if (log.isDebugEnabled())
+                            if (log.isDebugEnabled()) {
                                 log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
                                     "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+                                cSF.onCancel();
 
-                            return;
+                                return;
+                            }
                         }
                         catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
                             throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
                         }
                     }
 
-                    requestPartitions(assigns, topVer, cSF);
+                    requestPartitions(cSF);
                 }
             }).start();
 
@@ -300,12 +323,13 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param assigns Assigns.
+     * @param fut Future.
      */
-    private void requestPartitions(
-        final GridDhtPreloaderAssignments assigns,
-        AffinityTopologyVersion topVer,
-        SyncFuture fut) {
+    private void requestPartitions(SyncFuture fut) {
+        final GridDhtPreloaderAssignments assigns = fut.assigns;
+
+        AffinityTopologyVersion topVer = fut.topologyVersion();
+
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             if (topologyChanged(topVer)) {
                 fut.onCancel();
@@ -313,73 +337,77 @@ public class GridDhtPartitionDemander {
                 return;
             }
 
+            final ClusterNode node = e.getKey();
+
             GridDhtPartitionDemandMessage d = e.getValue();
 
             d.timeout(cctx.config().getRebalanceTimeout());
             d.workerId(0);//old api support.
 
-            final ClusterNode node = e.getKey();
+            final CacheConfiguration cfg = cctx.config();
 
             final long start = U.currentTimeMillis();
 
-            final CacheConfiguration cfg = cctx.config();
+            fut.logStart(node.id(), start);
 
-            final AffinityTopologyVersion top = d.topologyVersion();
+            U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
 
-            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                    ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+            //Check remote node rebalancing API version.
+            if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
+                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
 
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> t) {
-                        Boolean completed = ((SyncFuture)t).isCompleted();
-                        U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
-                            + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
-                            ", time=" + (U.currentTimeMillis() - start) + " ms]");
-                    }
-                });
-            }
+                remainings.addAll(d.partitions());
 
-            GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+                fut.append(node.id(), remainings);
 
-            remainings.addAll(d.partitions());
+                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
 
-            fut.append(node.id(), remainings);
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
-            int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
 
-            List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+                Iterator<Integer> it = d.partitions().iterator();
 
-            for (int cnt = 0; cnt < lsnrCnt; cnt++)
-                sParts.add(new HashSet<Integer>());
+                int cnt = 0;
 
-            Iterator<Integer> it = d.partitions().iterator();
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
 
-            int cnt = 0;
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
 
-            while (it.hasNext())
-                sParts.get(cnt++ % lsnrCnt).add(it.next());
+                    if (!sParts.get(cnt).isEmpty()) {
 
-            for (cnt = 0; cnt < lsnrCnt; cnt++) {
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                if (!sParts.get(cnt).isEmpty()) {
+                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
 
-                    // Create copy.
-                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+                        try {
+                            if (!topologyChanged(topVer))
+                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                            else
+                                fut.onCancel();
+                        }
+                        catch (IgniteCheckedException ex) {
+                            fut.onCancel();
 
-                    initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+                            U.error(log, "Failed to send partition demand message to node", ex);
+                        }
 
-                    try {
-                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Failed to send partition demand message to local node", ex);
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
                     }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
                 }
             }
+            else {
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+                fut.append(node.id(), d.partitions());
+
+                dw.run(node, d);
+            }
         }
     }
 
@@ -445,7 +473,7 @@ public class GridDhtPartitionDemander {
         final SyncFuture fut = syncFut;
 
         if (topologyChanged(topVer)) {
-            fut.onCancel(id, topVer);
+            fut.onCancel();
 
             return;
         }
@@ -462,7 +490,7 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Class got undeployed during preloading: " + supply.classError());
 
-            fut.onCancel(id, topVer);
+            fut.onCancel(id);
 
             return;
         }
@@ -515,7 +543,7 @@ public class GridDhtPartitionDemander {
                             if (last) {
                                 top.own(part);
 
-                                fut.onPartitionDone(id, p, topVer);
+                                fut.onPartitionDone(id, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
@@ -527,14 +555,14 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        fut.onPartitionDone(id, p, topVer);
+                        fut.onPartitionDone(id, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                     }
                 }
                 else {
-                    fut.onPartitionDone(id, p, topVer);
+                    fut.onPartitionDone(id, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -544,10 +572,10 @@ public class GridDhtPartitionDemander {
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    fut.onMissedPartition(id, miss, topVer);
+                    fut.onMissedPartition(id, miss);
 
             for (Integer miss : supply.missed())
-                fut.onPartitionDone(id, miss, topVer);
+                fut.onPartitionDone(id, miss);
 
             if (!fut.isDone()) {
 
@@ -569,15 +597,15 @@ public class GridDhtPartitionDemander {
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
-                log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                log.debug("Node left during rebalancing [node=" + node.id() +
                     ", msg=" + e.getMessage() + ']');
-            fut.onCancel(id, topVer);
+            fut.onCancel();
         }
         catch (IgniteCheckedException ex) {
             U.error(log, "Failed to receive partitions from node (rebalancing will not " +
                 "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
 
-            fut.onCancel(id, topVer);
+            fut.onCancel(node.id());
         }
     }
 
@@ -687,6 +715,10 @@ public class GridDhtPartitionDemander {
 
         private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
 
+        private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+
+        private Lock lock = new ReentrantLock();
+
         private volatile GridLocalEventListener lsnr;
 
         /** Assignments. */
@@ -694,14 +726,23 @@ public class GridDhtPartitionDemander {
 
         private volatile boolean completed = true;
 
+        /**
+         * @param assigns Assigns.
+         */
         SyncFuture(GridDhtPreloaderAssignments assigns) {
             this.assigns = assigns;
         }
 
+        /**
+         *
+         */
         public AffinityTopologyVersion topologyVersion() {
             return assigns != null ? assigns.topologyVersion() : null;
         }
 
+        /**
+         * @param assigns Assigns.
+         */
         void init(GridDhtPreloaderAssignments assigns) {
             final SyncFuture fut = this;
 
@@ -716,80 +757,157 @@ public class GridDhtPartitionDemander {
             this.assigns = assigns;
         }
 
+        /**
+         *
+         */
         boolean isInited() {
             return assigns != null;
         }
 
+        /**
+         * @param nodeId Node id.
+         * @param parts Parts.
+         */
         void append(UUID nodeId, Collection<Integer> parts) {
             remaining.put(nodeId, parts);
 
             missed.put(nodeId, new GridConcurrentHashSet<Integer>());
         }
 
+        /**
+         * @param nodeId Node id.
+         * @param time Time.
+         */
+        void logStart(UUID nodeId, long time) {
+            started.put(nodeId, time);
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @param node Node.
+         */
         GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) {
-            if (!topVer.equals(assigns.topologyVersion()))
+            if (isDone() || !topVer.equals(assigns.topologyVersion()))
                 return null;
 
             return assigns.get(node);
         }
 
+        /**
+         *
+         */
         void onCancel() {
-            remaining.clear();
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                remaining.clear();
 
-            completed = false;
+                completed = false;
 
-            checkIsDone();
+                U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion() +
+                    ", time=" +
+                    (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
-        void onCancel(UUID nodeId, AffinityTopologyVersion topVer) {
-            if (isDone() || !topVer.equals(assigns.topologyVersion()))
-                return;
+        /**
+         * @param nodeId Node id.
+         */
+        void onCancel(UUID nodeId) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
 
-            remaining.remove(nodeId);
+                remaining.remove(nodeId);
 
-            completed = false;
+                completed = false;
+
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
 
-            checkIsDone();
         }
 
+        /**
+         * @return Is completed.
+         */
         boolean isCompleted() {
             return completed;
         }
 
-        void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
-            if (isDone() || !topVer.equals(assigns.topologyVersion()))
-                return;
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onMissedPartition(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
 
-            if (missed.get(nodeId) == null)
-                missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+                if (missed.get(nodeId) == null)
+                    missed.put(nodeId, new GridConcurrentHashSet<Integer>());
 
-            missed.get(nodeId).add(p);
+                missed.get(nodeId).add(p);
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
-        void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) {
-            if (isDone() || !topVer.equals(assigns.topologyVersion()))
-                return;
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onPartitionDone(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
 
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                    assigns.exchangeFuture().discoveryEvent());
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                        assigns.exchangeFuture().discoveryEvent());
 
-            Collection<Integer> parts = remaining.get(nodeId);
+                Collection<Integer> parts = remaining.get(nodeId);
 
-            if (parts != null) {
-                parts.remove(p);
+                if (parts != null) {
+                    parts.remove(p);
 
-                if (parts.isEmpty()) {
-                    remaining.remove(nodeId);
+                    if (parts.isEmpty()) {
+                        remaining.remove(nodeId);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
+                        U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+                            ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                            ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+                    }
                 }
-            }
 
-            checkIsDone();
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
+        /**
+         *
+         */
         private void checkIsDone() {
             if (remaining.isEmpty()) {
                 if (log.isDebugEnabled())
@@ -809,8 +927,6 @@ public class GridDhtPartitionDemander {
                     cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
                 }
 
-                missed.clear();
-
                 cctx.shared().exchange().scheduleResendPartitions();
 
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
@@ -820,7 +936,412 @@ public class GridDhtPartitionDemander {
                     cctx.events().removeListener(lsnr);
 
                 onDone(completed);
+
+                missed.clear();
+                remaining.clear();
+                started.clear();
+                assigns.clear();
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
+    /**
+     *
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private class DemandWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Message queue. */
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
+            new LinkedBlockingDeque<>();
+
+        /** Counter. */
+        private long cntr;
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        private volatile SyncFuture fut;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id, SyncFuture fut) {
+            assert id >= 0;
+
+            this.id = id;
+            this.fut = fut;
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void addMessage(SupplyMessage msg) {
+            msgQ.offer(msg);
+        }
+
+        /**
+         * @param deque Deque to poll from.
+         * @param time Time to wait.
+         * @return Polled item.
+         * @throws InterruptedException If interrupted.
+         */
+        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+            return deque.poll(time, MILLISECONDS);
+        }
+
+        /**
+         * @param idx Unique index for this topic.
+         * @return Topic for partition.
+         */
+        public Object topic(long idx) {
+            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @return Missed partitions.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private Set<Integer> demandFromNode(
+            ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            GridDhtPartitionDemandMessage d,
+            GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            GridDhtPartitionTopology top = cctx.dht().topology();
+
+            cntr++;
+
+            d.topic(topic(cntr));
+            d.workerId(id);
+
+            Set<Integer> missed = new HashSet<>();
+
+            // Get the same collection that will be sent in the message.
+            Collection<Integer> remaining = d.partitions();
+
+            if (topologyChanged(topVer))
+                return missed;
+
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
+                }
+            });
+
+            try {
+                boolean retry;
+
+                // DoWhile.
+                // =======
+                do {
+                    retry = false;
+
+                    // Create copy.
+                    d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                    long timeout = cctx.config().getRebalanceTimeout();
+
+                    d.timeout(timeout);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+                    // Send demand message.
+                    cctx.io().send(node, d, cctx.ioPolicy());
+
+                    // While.
+                    // =====
+                    while (!topologyChanged(topVer)) {
+                        SupplyMessage s = poll(msgQ, timeout);
+
+                        // If timed out.
+                        if (s == null) {
+                            if (msgQ.isEmpty()) { // Safety check.
+                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+                                    " configuration properties).");
+
+                                // Ordered listener was removed if timeout expired.
+                                cctx.io().removeOrderedHandler(d.topic());
+
+                                // Must create copy to be able to work with IO manager thread local caches.
+                                d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                                // Create new topic.
+                                d.topic(topic(++cntr));
+
+                                // Create new ordered listener.
+                                cctx.io().addOrderedHandler(d.topic(),
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId,
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
+                                        }
+                                    });
+
+                                // Resend message with larger timeout.
+                                retry = true;
+
+                                break; // While.
+                            }
+                            else
+                                continue; // While.
+                        }
+
+                        // Check that message was received from expected node.
+                        if (!s.senderId().equals(node.id())) {
+                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                            continue; // While.
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received supply message: " + s);
+
+                        GridDhtPartitionSupplyMessage supply = s.supply();
+
+                        // Check whether there were class loading errors on unmarshal
+                        if (supply.classError() != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                            retry = true;
+
+                            // Quit preloading.
+                            break;
+                        }
+
+                        // Preload.
+                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                            int p = e.getKey();
+
+                            if (cctx.affinity().localNode(p, topVer)) {
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                                assert part != null;
+
+                                if (part.state() == MOVING) {
+                                    boolean reserved = part.reserve();
+
+                                    assert reserved : "Failed to reserve partition [gridName=" +
+                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                                    part.lock();
+
+                                    try {
+                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+
+                                        // Loop through all received entries and try to preload them.
+                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                            if (!invalidParts.contains(p)) {
+                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Preloading is not permitted for entry due to " +
+                                                            "evictions [key=" + entry.key() +
+                                                            ", ver=" + entry.version() + ']');
+
+                                                    continue;
+                                                }
+
+                                                if (!preloadEntry(node, p, entry, topVer)) {
+                                                    invalidParts.add(p);
+
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Got entries for invalid partition during " +
+                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+                                                }
+                                            }
+                                        }
+
+                                        boolean last = supply.last().contains(p);
+
+                                        // If message was last for this partition,
+                                        // then we take ownership.
+                                        if (last) {
+                                            remaining.remove(p);
+                                            fut.onPartitionDone(node.id(), p);
+
+                                            top.own(part);
+
+                                            if (log.isDebugEnabled())
+                                                log.debug("Finished rebalancing partition: " + part);
+
+                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                                    exchFut.discoveryEvent());
+                                        }
+                                    }
+                                    finally {
+                                        part.unlock();
+                                        part.release();
+                                    }
+                                }
+                                else {
+                                    remaining.remove(p);
+                                    fut.onPartitionDone(node.id(), p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                }
+                            }
+                            else {
+                                remaining.remove(p);
+                                fut.onPartitionDone(node.id(), p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                            }
+                        }
+
+                        remaining.removeAll(s.supply().missed());
+
+                        // Only request partitions based on latest topology version.
+                        for (Integer miss : s.supply().missed()) {
+                            if (cctx.affinity().localNode(miss, topVer))
+                                missed.add(miss);
+
+                            fut.onMissedPartition(node.id(), miss);
+                        }
+
+                        if (remaining.isEmpty())
+                            break; // While.
+
+                        if (s.supply().ack()) {
+                            retry = true;
+
+                            break;
+                        }
+                    }
+                }
+                while (retry && !topologyChanged(topVer));
+
+                return missed;
+            }
+            finally {
+                cctx.io().removeOrderedHandler(d.topic());
+            }
+        }
+
+        /**
+         * @param node Node.
+         * @param d D.
+         */
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+            demandLock.readLock().lock();
+
+            try {
+                GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+
+                AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+
+                Collection<Integer> missed = new HashSet<>();
+
+                if (topologyChanged(topVer)) {
+                    fut.onCancel();
+
+                    return;
+                }
+
+                try {
+                    Set<Integer> set = demandFromNode(node, topVer, d, exchFut);
+
+                    if (!set.isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+                                set + ']');
+
+                        missed.addAll(set);
+                    }
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                            ", msg=" + e.getMessage() + ']');
+
+                    fut.onCancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+                    fut.onCancel(node.id());
+                }
+                catch (InterruptedException e) {
+                    fut.onCancel();
+                }
             }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 347a394..0686376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -25,15 +25,12 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jsr166.*;
 
 import java.util.*;
-import java.util.concurrent.locks.*;
 
-import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
@@ -73,13 +70,14 @@ class GridDhtPartitionSupplier {
 
         top = cctx.dht().topology();
 
-         depEnabled = cctx.gridDeploy().enabled();
+        depEnabled = cctx.gridDeploy().enabled();
     }
 
     /**
      *
      */
     void start() {
+        startOldListeners();
     }
 
     /**
@@ -463,14 +461,14 @@ class GridDhtPartitionSupplier {
         int phase,
         Iterator<Integer> partIt,
         int part,
-        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
         scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
     }
 
     /**
      * Supply context.
      */
-    private static class SupplyContext{
+    private static class SupplyContext {
         /** Phase. */
         private int phase;
 
@@ -502,4 +500,284 @@ class GridDhtPartitionSupplier {
             this.part = part;
         }
     }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void startOldListeners() {
+        if (!cctx.kernalContext().clientNode()) {
+            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processOldDemandMessage(m, id);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param d D.
+     * @param id Id.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        ClusterNode node = cctx.node(id);
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        boolean ack = false;
+
+        try {
+            for (int part : d.partitions()) {
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    for (GridCacheEntryEx e : loc.entries()) {
+                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                            // Demander no longer needs this partition, so we send '-1' partition and move on.
+                            s.missed(part);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Demanding node does not need requested partition [part=" + part +
+                                    ", nodeId=" + id + ']');
+
+                            partMissing = true;
+
+                            break;
+                        }
+
+                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            ack = true;
+
+                            if (!replyOld(node, d, s))
+                                return;
+
+                            // Throttle preloading.
+                            if (preloadThrottle > 0)
+                                U.sleep(preloadThrottle);
+
+                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                cctx.cacheId());
+                        }
+
+                        GridCacheEntryInfo info = e.info();
+
+                        if (info != null && !info.isNew()) {
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    if (partMissing)
+                        continue;
+
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
+
+                                        if (!replyOld(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                            d.updateSequence(), cctx.cacheId());
+                                    }
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        for (GridCacheEntryInfo info : entries) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
+
+                                if (!replyOld(node, d, s))
+                                    return;
+
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                    d.updateSequence(),
+                                    cctx.cacheId());
+                            }
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
+                    }
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            replyOld(node, d, s);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 585566b..7a9deba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -163,7 +163,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             });
 
         supplier = new GridDhtPartitionSupplier(cctx);
-        demander = new GridDhtPartitionDemander(cctx);
+        demander = new GridDhtPartitionDemander(cctx, demandLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -350,7 +350,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
-        demander.handleSupplyMessage(idx, id, s);
+        demandLock.readLock().lock();
+        try {
+            demander.handleSupplyMessage(idx, id, s);
+        }
+        finally {
+            demandLock.readLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
deleted file mode 100644
index ca564ed..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest {
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
-        CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
-
-        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
-
-        iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
-
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
-        if (getTestGridName(20).equals(gridName))
-           spi =(FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
-
-        return iCfg;
-    }
-
-    public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi{
-        public void fail(){
-            simulateNodeFailure();
-        }
-    }
-
-    private volatile FailableTcpDiscoverySpi spi;
-
-    /**
-     * @throws Exception
-     */
-    public void testNodeFailedAtRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
-
-        generateData(ignite);
-
-        log.info("Preloading started.");
-
-        startGrid(1);
-
-        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
-        f1.get();
-
-        startGrid(20);
-
-        U.sleep(500);
-
-        spi.fail();
-
-        U.sleep(500);
-
-        f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-        IgniteInternalFuture f0 = ((GridCacheAdapter)grid(0).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
-        f1.get();
-        f0.get();
-
-        stopAllGrids();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
deleted file mode 100644
index f69b710..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest {
-    /** */
-    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    private static int TEST_SIZE = 1_000_000;
-
-    /** cache name. */
-    protected static String CACHE_NAME_DHT = "cache";
-
-    /** cache 2 name. */
-    protected static String CACHE_2_NAME_DHT = "cache2";
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return Long.MAX_VALUE;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
-        if (getTestGridName(10).equals(gridName))
-            iCfg.setClientMode(true);
-
-        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
-        cacheCfg.setName(CACHE_NAME_DHT);
-        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        //cacheCfg.setRebalanceBatchSize(1024);
-        //cacheCfg.setRebalanceBatchesCount(1);
-        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setBackups(1);
-
-        CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
-
-        cacheCfg2.setName(CACHE_2_NAME_DHT);
-        cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
-        //cacheCfg2.setRebalanceBatchSize(1024);
-        //cacheCfg2.setRebalanceBatchesCount(1);
-        cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg2.setBackups(1);
-
-        iCfg.setRebalanceThreadPoolSize(4);
-        iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
-        return iCfg;
-    }
-
-    /**
-     * @param ignite Ignite.
-     */
-    protected void generateData(Ignite ignite) {
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
-            for (int i = 0; i < TEST_SIZE; i++) {
-                if (i % 1_000_000 == 0)
-                    log.info("Prepared " + i / 1_000_000 + "m entries.");
-
-                stmr.addData(i, i);
-            }
-        }
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
-            for (int i = 0; i < TEST_SIZE; i++) {
-                if (i % 1_000_000 == 0)
-                    log.info("Prepared " + i / 1_000_000 + "m entries.");
-
-                stmr.addData(i, i + 3);
-            }
-        }
-    }
-
-    /**
-     * @param ignite Ignite.
-     * @throws IgniteCheckedException
-     */
-    protected void checkData(Ignite ignite) throws IgniteCheckedException {
-        for (int i = 0; i < TEST_SIZE; i++) {
-            if (i % 1_000_000 == 0)
-                log.info("Checked " + i / 1_000_000 + "m entries.");
-
-            assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
-                "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
-        }
-        for (int i = 0; i < TEST_SIZE; i++) {
-            if (i % 1_000_000 == 0)
-                log.info("Checked " + i / 1_000_000 + "m entries.");
-
-            assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
-                "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
-        }
-    }
-
-    /**
-     * @throws Exception
-     */
-    public void testSimpleRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
-
-        generateData(ignite);
-
-        log.info("Preloading started.");
-
-        long start = System.currentTimeMillis();
-
-        startGrid(1);
-
-        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
-        f1.get();
-
-        long spend = (System.currentTimeMillis() - start) / 1000;
-
-        stopGrid(0);
-
-        checkData(grid(1));
-
-        log.info("Spend " + spend + " seconds to rebalance entries.");
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception
-     */
-    public void testComplexRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
-
-        generateData(ignite);
-
-        log.info("Preloading started.");
-
-        long start = System.currentTimeMillis();
-
-        //will be started simultaneously in case of ASYNC mode
-        startGrid(1);
-        startGrid(2);
-        startGrid(3);
-        startGrid(4);
-
-        //wait until cache rebalanced in async mode
-
-        GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-
-        GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
-        GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
-        GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
-        GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
-
-        IgniteInternalFuture f24 = p24.syncFuture();
-        f24.get();
-
-        IgniteInternalFuture f14 = p14.syncFuture();
-        f14.get();
-
-        AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion();
-
-        IgniteInternalFuture f11 = p11.syncFuture();
-        IgniteInternalFuture f12 = p12.syncFuture();
-        IgniteInternalFuture f13 = p13.syncFuture();
-
-        while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) {
-            U.sleep(100);
-
-            f11 = p11.syncFuture();
-            f12 = p12.syncFuture();
-            f13 = p13.syncFuture();
-        }
-        f11.get();
-        f12.get();
-        f13.get();
-
-        IgniteInternalFuture f21 = p21.syncFuture();
-        IgniteInternalFuture f22 = p22.syncFuture();
-        IgniteInternalFuture f23 = p23.syncFuture();
-
-        while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) ||
-            !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) {
-            U.sleep(100);
-
-            f21 = p21.syncFuture();
-            f22 = p22.syncFuture();
-            f23 = p23.syncFuture();
-        }
-        f21.get();
-        f22.get();
-        f23.get();
-
-        //cache rebalanced in async node
-
-        f11 = p11.syncFuture();
-        f12 = p12.syncFuture();
-        f13 = p13.syncFuture();
-        f14 = p14.syncFuture();
-
-        f21 = p21.syncFuture();
-        f22 = p22.syncFuture();
-        f23 = p23.syncFuture();
-        f24 = p24.syncFuture();
-
-        stopGrid(0);
-
-        //wait until cache rebalanced
-
-        while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
-            U.sleep(100);
-
-        while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
-            U.sleep(100);
-
-        p11.syncFuture().get();
-        p12.syncFuture().get();
-        p13.syncFuture().get();
-        p14.syncFuture().get();
-
-        p21.syncFuture().get();
-        p22.syncFuture().get();
-        p23.syncFuture().get();
-        p24.syncFuture().get();
-
-        //cache rebalanced
-
-        f12 = p12.syncFuture();
-        f13 = p13.syncFuture();
-        f14 = p14.syncFuture();
-
-        f22 = p22.syncFuture();
-        f23 = p23.syncFuture();
-        f24 = p24.syncFuture();
-
-        stopGrid(1);
-
-        //wait until cache rebalanced
-
-        while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
-            U.sleep(100);
-
-        while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
-            U.sleep(100);
-
-        p12.syncFuture().get();
-        p13.syncFuture().get();
-        p14.syncFuture().get();
-
-        p22.syncFuture().get();
-        p23.syncFuture().get();
-        p24.syncFuture().get();
-
-        //cache rebalanced
-
-        f13 = p13.syncFuture();
-        f14 = p14.syncFuture();
-
-        f23 = p23.syncFuture();
-        f24 = p24.syncFuture();
-
-        stopGrid(2);
-
-        //wait until cache rebalanced
-
-        while (f13 == p13.syncFuture() || f14 == p14.syncFuture())
-            U.sleep(100);
-
-        while (f23 == p23.syncFuture() || f24 == p24.syncFuture())
-            U.sleep(100);
-
-        p13.syncFuture().get();
-        p14.syncFuture().get();
-
-        p23.syncFuture().get();
-        p24.syncFuture().get();
-
-        //cache rebalanced
-
-        stopGrid(3);
-
-        long spend = (System.currentTimeMillis() - start) / 1000;
-
-        checkData(grid(4));
-
-        log.info("Spend " + spend + " seconds to rebalance entries.");
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception
-     */
-    public void _testOpPerSecRebalancingTest() throws Exception {
-        startGrid(0);
-
-        final AtomicBoolean cancelled = new AtomicBoolean(false);
-
-        generateData(grid(0));
-
-        startGrid(1);
-        startGrid(2);
-        startGrid(10);
-
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-
-                long spend = 0;
-
-                long ops = 0;
-
-                while (!cancelled.get()) {
-                    try {
-                        long start = System.currentTimeMillis();
-
-                        int size = 1000;
-
-                        for (int i = 0; i < size; i++)
-                            grid(10).cachex(CACHE_NAME_DHT).remove(i);
-
-                        for (int i = 0; i < size; i++)
-                            grid(10).cachex(CACHE_NAME_DHT).put(i, i);
-
-                        spend += System.currentTimeMillis() - start;
-
-                        ops += size * 2;
-                    }
-                    catch (IgniteCheckedException e) {
-                        e.printStackTrace();
-                    }
-
-                    log.info("Ops. per ms: " + ops / spend);
-                }
-            }
-        });
-        t.start();
-
-        stopGrid(0);
-        startGrid(0);
-
-        stopGrid(0);
-        startGrid(0);
-
-        stopGrid(0);
-        startGrid(0);
-
-        cancelled.set(true);
-        t.join();
-
-        checkData(grid(10));
-
-        //stopAllGrids();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..a17fc7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        cacheCfg = iCfg.getCacheConfiguration()[1];
+
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(20).equals(gridName))
+            spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
+
+        return iCfg;
+    }
+
+    public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
+        public void fail() {
+            simulateNodeFailure();
+        }
+    }
+
+    private volatile FailableTcpDiscoverySpi spi;
+
+    /**
+     * @throws Exception
+     */
+    public void testNodeFailedAtRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        startGrid(1);
+
+        waitForRebalancing(1, 2);
+
+        startGrid(20);
+
+        waitForRebalancing(20, 3);
+
+        spi.fail();
+
+        waitForRebalancing(0, 4);
+        waitForRebalancing(1, 4);
+
+        stopAllGrids();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..0cb6c7b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    private static int TEST_SIZE = 1_000_000;
+
+    /** cache name. */
+    protected static String CACHE_NAME_DHT = "cache";
+
+    /** cache 2 name. */
+    protected static String CACHE_2_NAME_DHT = "cache2";
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(10).equals(gridName))
+            iCfg.setClientMode(true);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+        cacheCfg.setName(CACHE_NAME_DHT);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg.setRebalanceBatchSize(1024);
+        //cacheCfg.setRebalanceBatchesCount(1);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setBackups(1);
+
+        CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+        cacheCfg2.setName(CACHE_2_NAME_DHT);
+        cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg2.setRebalanceBatchSize(1024);
+        //cacheCfg2.setRebalanceBatchesCount(1);
+        cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg2.setBackups(1);
+
+        iCfg.setRebalanceThreadPoolSize(4);
+        iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
+        return iCfg;
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite) {
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i);
+            }
+        }
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i + 3);
+            }
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException
+     */
+    protected void checkData(Ignite ignite) throws IgniteCheckedException {
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+                "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+        }
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+                "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testSimpleRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+
+        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        f1.get();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        stopGrid(0);
+
+        checkData(grid(1));
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param id Id.
+     * @param top Topology.
+     */
+    protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+        boolean finished = false;
+
+        while (!finished) {
+            finished = true;
+
+            for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+                GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
+                if (fut.topologyVersion().topologyVersion() != top) {
+                    finished = false;
+
+                    break;
+                }
+                else
+                    fut.get();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testComplexRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        //will be started simultaneously in case of ASYNC mode
+        startGrid(1);
+        startGrid(2);
+        startGrid(3);
+        startGrid(4);
+
+        //wait until cache rebalanced in async mode
+        waitForRebalancing(1, 5);
+        waitForRebalancing(2, 5);
+        waitForRebalancing(3, 5);
+        waitForRebalancing(4, 5);
+
+        //cache rebalanced in async node
+
+        stopGrid(0);
+
+        //wait until cache rebalanced
+        waitForRebalancing(1, 6);
+        waitForRebalancing(2, 6);
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
+
+        //cache rebalanced
+
+        stopGrid(1);
+
+        //wait until cache rebalanced
+        waitForRebalancing(2, 7);
+        waitForRebalancing(3, 7);
+        waitForRebalancing(4, 7);
+
+        //cache rebalanced
+
+        stopGrid(2);
+
+        //wait until cache rebalanced
+        waitForRebalancing(3, 8);
+        waitForRebalancing(4, 8);
+
+        //cache rebalanced
+
+        stopGrid(3);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(4));
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testBackwardCompatibility() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        Map<String, Object> map = new HashMap<>(ignite.cluster().localNode().attributes());
+
+        map.put(IgniteNodeAttributes.REBALANCING_VERSION, 0);
+
+        ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
+
+        generateData(ignite);
+
+        startGrid(1);
+
+        waitForRebalancing(1, 2);
+
+        stopGrid(0);
+
+        checkData(grid(1));
+
+    }
+}
\ No newline at end of file