You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/27 10:00:39 UTC

[1/5] ignite git commit: Ignite-1093

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-3 [created] a34a408bf


http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 74237f8..edcc18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -17,15 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -47,27 +51,38 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache preloader.
  */
 public class GridDhtPreloader extends GridCachePreloaderAdapter {
+    /** */
+    public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
+
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
@@ -81,10 +96,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool supplyPool;
+    private GridDhtPartitionSupplier supplier;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool demandPool;
+    private GridDhtPartitionDemander demander;
 
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
@@ -92,10 +107,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Busy lock to prevent activities from accessing exchanger while it's stopping. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
     /** Pending affinity assignment futures. */
     private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>();
+
+    /** */
+    private final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0);
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -179,8 +203,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx);
+        demander = new GridDhtPartitionDemander(cctx, busyLock);
+
+        supplier.start();
+        demander.start();
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -199,19 +226,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         final long startTopVer = loc.order();
 
         topVer.setIfGreater(startTopVer);
-
-        supplyPool.start();
-        demandPool.start();
     }
 
     /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         super.preloadPredicate(preloadPred);
 
-        assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+        assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
 
-        supplyPool.preloadPredicate(preloadPred);
-        demandPool.preloadPredicate(preloadPred);
+        supplier.preloadPredicate(preloadPred);
+        demander.preloadPredicate(preloadPred);
     }
 
     /** {@inheritDoc} */
@@ -225,37 +249,104 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
-        if (supplyPool != null)
-            supplyPool.stop();
+        if (supplier != null)
+            supplier.stop();
 
-        if (demandPool != null)
-            demandPool.stop();
+        if (demander != null)
+            demander.stop();
 
         top = null;
     }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
-        if (err == null) {
+        if (err == null)
             startFut.onDone();
+        else
+            startFut.onDone(err);
+    }
 
-            final long start = U.currentTimeMillis();
+    /** {@inheritDoc} */
+    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        demander.updateLastExchangeFuture(lastFut);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        int partCnt = cctx.affinity().partitions();
+
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
+
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
 
-            final CacheConfiguration cfg = cctx.config();
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
-            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+                assert part != null;
+                assert part.id() == p;
 
-                demandPool.syncFuture().listen(new CI1<Object>() {
-                    @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
-                            "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
                     }
-                });
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.rand(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
             }
         }
-        else
-            startFut.onDone(err);
+
+        return assigns;
     }
 
     /** {@inheritDoc} */
@@ -269,24 +360,77 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         topVer.set(topVer0);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onExchangeFutureAdded() {
-        demandPool.onExchangeFutureAdded();
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
     }
 
     /** {@inheritDoc} */
-    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demandPool.updateLastExchangeFuture(lastFut);
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+        if (!enterBusy())
+            return;
+
+        try {
+            demandLock.readLock().lock();
+            try {
+                demander.handleSupplyMessage(idx, id, s);
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demandPool.assign(exchFut);
+    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+        if (!enterBusy())
+            return;
+
+        try {
+            supplier.handleDemandMessage(idx, id, d);
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
-        demandPool.addAssignments(assignments, forcePreload);
+    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload, Collection<String> caches, int cnt) {
+        return demander.addAssignments(assignments, forcePreload, caches, cnt);
     }
 
     /**
@@ -298,7 +442,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
     }
 
     /**
@@ -581,12 +730,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void forcePreload() {
-        demandPool.forcePreload();
+        demander.forcePreload();
     }
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demandPool.unwindUndeploys();
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**
@@ -607,6 +763,41 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         forceKeyFuts.remove(fut.futureId(), fut);
     }
 
+    /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+        partitionsToEvict.add(part);
+
+        if (partitionsEvictionOwning.compareAndSet(0, 1)) {
+            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+                @Override public Boolean call() {
+                    boolean firstRun = true;
+
+                    while (true) {
+                        if (!firstRun && !partitionsEvictionOwning.compareAndSet(0, 1))
+                            return false;
+
+                        firstRun = false;
+
+                        try {
+                            GridDhtLocalPartition part = partitionsToEvict.poll();
+
+                            if (part == null) {
+                                return false;
+                            }
+
+                            part.tryEvict();
+                        }
+                        finally {
+                            boolean res = partitionsEvictionOwning.compareAndSet(1, 0);
+
+                            assert res;
+                        }
+                    }
+                }
+            }, /*system pool*/ true);
+        }
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 7c5e97c..810bd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1292,6 +1292,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                         catch (IgniteCheckedException e) {
                                             U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
                                         }
+                                        finally {
+                                            ctx.cache().context().txContextReset();
+                                        }
                                     }
                                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 26a41de..9315d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,7 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         if (log.isDebugEnabled())
                             U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
 
-                        return;
+                        selfOccupied = true;
+
+                        continue;
                     }
 
                     // Only process 1st response and ignore following ones. This scenario

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
index 835cdcb..c95a859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
@@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza
 
         GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o;
 
-        return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
+        return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1824339..183838b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -525,6 +525,8 @@ public class GridNioServer<T> {
         assert ses instanceof GridSelectorNioSessionImpl;
         assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
 
+        U.log(log, "Pausing reads");
+
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
 
         if (impl.closed())

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6254605..854ce95 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * <p>
      * This method is intended for test purposes only.
      */
-    void simulateNodeFailure() {
+    protected void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 3913957..d868468 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -105,6 +105,9 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
 
         GridTestUtils.setMemoryMode(cfg, ccfg, memMode, 100, 1024);
 
+        //To be uncommented after https://issues.apache.org/jira/browse/IGNITE-1578 fix.
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..c65a0ed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+            cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+        }
+
+        return iCfg;
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    public void testNodeFailedAtRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        startGrid(1);
+
+        GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)grid(1).context().
+            cache().internalCache(CACHE_NAME_DHT_REPLICATED).preloader().rebalanceFuture();
+
+        fut.get();
+
+        ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure();
+
+        checkSupplyContextMapIsEmpty();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..cea7808
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,472 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int TEST_SIZE = 100_000;
+
+    /** partitioned cache name. */
+    protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+
+    /** partitioned cache 2 name. */
+    protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+
+    /** replicated cache name. */
+    protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+
+    /** replicated cache 2 name. */
+    protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+
+    /** */
+    private volatile boolean concurrentStartFinished;
+
+    /** */
+    private volatile boolean concurrentStartFinished2;
+
+    /** */
+    private volatile boolean concurrentStartFinished3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(10).equals(gridName))
+            iCfg.setClientMode(true);
+
+        CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>();
+
+        cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED);
+        cachePCfg.setCacheMode(CacheMode.PARTITIONED);
+        cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cachePCfg.setBackups(1);
+        cachePCfg.setRebalanceBatchSize(1);
+        cachePCfg.setRebalanceBatchesCount(1);
+        cachePCfg.setRebalanceOrder(2);
+
+        CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
+
+        cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2);
+        cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
+        cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cachePCfg2.setBackups(1);
+        cachePCfg2.setRebalanceOrder(2);
+        //cachePCfg2.setRebalanceDelay(5000);//Known issue, possible deadlock in case of low priority cache rebalancing delayed.
+
+        CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+        cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
+        cacheRCfg.setCacheMode(CacheMode.REPLICATED);
+        cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheRCfg.setRebalanceBatchSize(1);
+        cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
+        ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE.
+
+        CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
+
+        cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
+        cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
+        cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheRCfg2.setRebalanceOrder(4);
+
+        iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2);
+
+        iCfg.setRebalanceThreadPoolSize(2);
+
+        return iCfg;
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite, int from, int iter) {
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite, String name, int from, int iter) {
+        for (int i = from; i < from + TEST_SIZE; i++) {
+            if (i % (TEST_SIZE / 10) == 0)
+                log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+            ignite.cache(name).put(i, i + name.hashCode() + iter);
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException {
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param name Cache name.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException {
+        for (int i = from; i < from + TEST_SIZE; i++) {
+            if (i % (TEST_SIZE / 10) == 0)
+                log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+            assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
+                i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+        }
+    }
+
+    /**
+     * @throws Exception Exception
+     */
+    public void testSimpleRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+
+        waitForRebalancing(0, 2);
+        waitForRebalancing(1, 2);
+
+        stopGrid(0);
+
+        waitForRebalancing(1, 3);
+
+        startGrid(2);
+
+        waitForRebalancing(1, 4);
+        waitForRebalancing(2, 4);
+
+        stopGrid(2);
+
+        waitForRebalancing(1, 5);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(1), 0, 0);
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+    }
+
+    /**
+     * @throws Exception Exception
+     */
+    public void testLoadRebalancing() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        startGrid(1);
+
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        concurrentStartFinished = false;
+
+        Thread t1 = new Thread() {
+            @Override public void run() {
+                while (!concurrentStartFinished) {
+                    for (int i = 0; i < 0 + TEST_SIZE; i++) {
+                        if (i % (TEST_SIZE / 10) == 0)
+                            log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+                        ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(i, i + CACHE_NAME_DHT_PARTITIONED.hashCode() + 0);
+                    }
+                }
+            }
+        };
+        Thread t2 = new Thread() {
+            @Override public void run() {
+                while (!concurrentStartFinished) {
+                    try {
+                        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+                    }
+                    catch (IgniteCheckedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+
+        t1.start();
+        t2.start();
+
+        startGrid(2);
+
+        waitForRebalancing(2, 3);
+
+        concurrentStartFinished = true;
+
+        t1.join();
+        t2.join();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @param minor Minor ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major));
+    }
+
+    /**
+     * @param id Node id.
+     * @param top Topology version.
+     * @throws IgniteCheckedException
+     */
+    protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
+        boolean finished = false;
+
+        while (!finished) {
+            finished = true;
+
+            for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+                GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
+                if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
+                    finished = false;
+
+                    break;
+                }
+                else if (!fut.get()) {
+                    finished = false;
+
+                    log.warning("Rebalancing finished with missed partitions.");
+                }
+            }
+        }
+    }
+
+    protected void checkSupplyContextMapIsEmpty() {
+        for (Ignite g : G.allGrids()) {
+            for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
+
+                Object supplier = U.field(c.preloader(), "supplier");
+
+                Map map = U.field(supplier, "scMap");
+
+                assert map.isEmpty();
+            }
+        }
+    }
+
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testComplexRebalancing() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        concurrentStartFinished = false;
+        concurrentStartFinished2 = false;
+        concurrentStartFinished3 = false;
+
+        Thread t1 = new Thread() {
+            @Override public void run() {
+                try {
+                    startGrid(1);
+                    startGrid(2);
+
+                    while (!concurrentStartFinished2) {
+                        U.sleep(10);
+                    }
+
+                    waitForRebalancing(0, 5, 0);
+                    waitForRebalancing(1, 5, 0);
+                    waitForRebalancing(2, 5, 0);
+                    waitForRebalancing(3, 5, 0);
+                    waitForRebalancing(4, 5, 0);
+
+                    checkSupplyContextMapIsEmpty();
+
+                    //New cache should start rebalancing.
+                    CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+                    cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+                    cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+                    cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+                    grid(0).getOrCreateCache(cacheRCfg);
+
+                    while (!concurrentStartFinished3) {
+                        U.sleep(10);
+                    }
+
+                    concurrentStartFinished = true;
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        Thread t2 = new Thread() {
+            @Override public void run() {
+                try {
+                    startGrid(3);
+                    startGrid(4);
+
+                    concurrentStartFinished2 = true;
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        Thread t3 = new Thread() {
+            @Override public void run() {
+                generateData(ignite, 0, 1);
+
+                concurrentStartFinished3 = true;
+            }
+        };
+
+        t1.start();
+        t2.start();// Should cancel t1 rebalancing.
+        t3.start();
+
+        t1.join();
+        t2.join();
+        t3.join();
+
+        waitForRebalancing(0, 5, 1);
+        waitForRebalancing(1, 5, 1);
+        waitForRebalancing(2, 5, 1);
+        waitForRebalancing(3, 5, 1);
+        waitForRebalancing(4, 5, 1);
+
+        checkSupplyContextMapIsEmpty();
+
+        checkData(grid(4), 0, 1);
+
+        final Ignite ignite3 = grid(3);
+
+        Thread t4 = new Thread() {
+            @Override public void run() {
+                generateData(ignite3, 0, 2);
+
+            }
+        };
+
+        t4.start();
+
+        stopGrid(0);
+
+        waitForRebalancing(1, 6);
+        waitForRebalancing(2, 6);
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
+
+        stopGrid(1);
+
+        waitForRebalancing(2, 7);
+        waitForRebalancing(3, 7);
+        waitForRebalancing(4, 7);
+
+        stopGrid(2);
+
+        waitForRebalancing(3, 8);
+        waitForRebalancing(4, 8);
+
+        t4.join();
+
+        stopGrid(3);
+
+        waitForRebalancing(4, 9);
+
+        checkSupplyContextMapIsEmpty();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(4), 0, 2);
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+    }
+
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index c4ad169..64f1495 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -142,26 +142,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testSingleZeroPoolSize() throws Exception {
-        preloadMode = SYNC;
-        poolSize = 0;
-
-        try {
-            startGrid(1);
-
-            assert false : "Grid should have been failed to start.";
-        }
-        catch (IgniteCheckedException e) {
-            info("Caught expected exception: " + e);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
      * @throws Exception If test failed.
      */
     public void testIntegrity() throws Exception {
@@ -602,4 +582,4 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             // No-op.
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
-        /** */
-        private boolean ignorePingResponse;
-
-        /** {@inheritDoc} */
-        protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
-            IgniteCheckedException {
-            if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
-                return;
-            else
-                super.writeToSocket(sock, msg, timeout);
-        }
-    }
-
-    /**
      * @throws Exception If any error occurs.
      */
     public void testNodeAdded() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    /** */
+    public boolean ignorePingResponse;
+
+    /** {@inheritDoc} */
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+        IgniteCheckedException {
+        if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+            return;
+        else
+            super.writeToSocket(sock, msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void simulateNodeFailure() {
+        super.simulateNodeFailure();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..41d4b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1228,7 +1229,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         cfg.setCommunicationSpi(commSpi);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
 
         if (isDebug()) {
             discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index e4c2129..a228a9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -482,7 +482,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                 if (i == 0)
                                     start = System.currentTimeMillis();
 
-                                if (System.currentTimeMillis() - start > 30_000)
+                                if (System.currentTimeMillis() - start > 30_000) {
+                                    U.dumpThreads(log);
+
                                     throw new IgniteException("Timeout of waiting for topology map update [" +
                                         "grid=" + g.name() +
                                         ", cache=" + cfg.getName() +
@@ -491,6 +493,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                         ", p=" + p +
                                         ", readVer=" + readyVer +
                                         ", locNode=" + g.cluster().localNode() + ']');
+                                }
 
                                 Thread.sleep(200); // Busy wait.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 02a7f7f..240cc68 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -85,6 +87,9 @@ public class IgniteCacheTestSuite3 extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("IgniteCache Test Suite part 3");
 
+        suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
+
         // Value consistency tests.
         suite.addTestSuite(GridCacheValueConsistencyAtomicSelfTest.class);
         suite.addTestSuite(GridCacheValueConsistencyAtomicPrimaryWriteOrderSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
index 0226046..582bfe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
@@ -107,25 +107,33 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTask() throws Exception {
+        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+        int initSize =  map.size();
+
         ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), Task.class, null);
 
-        testMessageSet(fut);
+        testMessageSet(fut, initSize, map);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTaskException() throws Exception {
+        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+        int initSize =  map.size();
+
         ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), FailTask.class, null);
 
-        testMessageSet(fut);
+        testMessageSet(fut, initSize, map);
     }
 
     /**
      * @param fut Future to cancel.
      * @throws Exception If failed.
      */
-    private void testMessageSet(IgniteFuture<?> fut) throws Exception {
+    private void testMessageSet(IgniteFuture<?> fut, int initSize, Map map) throws Exception {
         cancelLatch.await();
 
         assertTrue(fut.cancel());
@@ -134,11 +142,9 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
 
         assertTrue(U.await(finishLatch, 5000, MILLISECONDS));
 
-        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
-
         info("Map: " + map);
 
-        assertTrue(map.isEmpty());
+        assertEquals(map.size(), initSize);
     }
 
     /**


[5/5] ignite git commit: Ignite-1093

Posted by sb...@apache.org.
Ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34a408b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34a408b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34a408b

Branch: refs/heads/ignite-1093-3
Commit: a34a408bf4736aca2879207037ba3bfe5d87de82
Parents: 1334e77
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Oct 27 12:00:13 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Oct 27 12:00:13 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |   50 +-
 .../configuration/IgniteConfiguration.java      |   30 +-
 .../apache/ignite/internal/IgniteKernal.java    |   10 +
 .../org/apache/ignite/internal/IgnitionEx.java  |    3 +
 .../communication/GridIoMessageFactory.java     |   10 +-
 .../processors/cache/GridCacheIoManager.java    |    9 +
 .../processors/cache/GridCacheMapEntry.java     |   38 +-
 .../GridCachePartitionExchangeManager.java      |  157 ++-
 .../processors/cache/GridCachePreloader.java    |   43 +-
 .../cache/GridCachePreloaderAdapter.java        |   35 +-
 .../processors/cache/GridCacheProcessor.java    |   54 +-
 .../distributed/dht/GridDhtCacheEntry.java      |    5 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   60 +-
 .../GridDhtPartitionDemandMessage.java          |    9 +-
 .../preloader/GridDhtPartitionDemandPool.java   | 1192 ----------------
 .../dht/preloader/GridDhtPartitionDemander.java | 1310 ++++++++++++++++++
 .../dht/preloader/GridDhtPartitionSupplier.java |  999 +++++++++++++
 .../GridDhtPartitionSupplyMessageV2.java        |  404 ++++++
 .../preloader/GridDhtPartitionSupplyPool.java   |  555 --------
 .../dht/preloader/GridDhtPreloader.java         |  269 +++-
 .../datastructures/DataStructuresProcessor.java |    3 +
 .../processors/task/GridTaskWorker.java         |    4 +-
 .../ignite/internal/util/lang/GridTuple4.java   |    2 +-
 .../ignite/internal/util/nio/GridNioServer.java |    2 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    2 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |    3 +
 .../GridCacheRebalancingAsyncSelfTest.java      |   63 +
 .../GridCacheRebalancingSyncSelfTest.java       |  472 +++++++
 .../GridCacheReplicatedPreloadSelfTest.java     |   22 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |   17 -
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |   46 +
 .../testframework/junits/GridAbstractTest.java  |    3 +-
 .../junits/common/GridCommonAbstractTest.java   |    5 +-
 .../testsuites/IgniteCacheTestSuite3.java       |    5 +
 .../tcp/GridOrderedMessageCancelSelfTest.java   |   18 +-
 35 files changed, 3946 insertions(+), 1963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 6ac2b64..4012792 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -73,6 +73,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default rebalance timeout (ms).*/
     public static final long DFLT_REBALANCE_TIMEOUT = 10000;
 
+    /** Default rebalance batches count. */
+    public static final long DFLT_REBALANCE_BATCHES_COUNT = 2;
+
     /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
     public static final long DFLT_REBALANCE_THROTTLE = 0;
 
@@ -256,6 +259,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Off-heap memory size. */
     private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
 
+    /** Rebalance batches count. */
+    private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
     /** */
     private boolean swapEnabled = DFLT_SWAP_ENABLED;
 
@@ -396,6 +402,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         rebalanceDelay = cc.getRebalanceDelay();
         rebalanceOrder = cc.getRebalanceOrder();
         rebalancePoolSize = cc.getRebalanceThreadPoolSize();
+        rebalanceBatchesCount = cc.getRebalanceBatchesCount();
         rebalanceTimeout = cc.getRebalanceTimeout();
         rebalanceThrottle = cc.getRebalanceThrottle();
         readFromBackup = cc.isReadFromBackup();
@@ -1033,10 +1040,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      * {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only.
      * <p/>
      * If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for
-     * all caches with smaller rebalance order (except caches with rebalance order {@code 0}) will be completed.
+     * all caches with smaller rebalance order will be completed.
      * <p/>
      * Note that cache with order {@code 0} does not participate in ordering. This means that cache with
-     * rebalance order {@code 1} will never wait for any other caches. All caches with order {@code 0} will
+     * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will
      * be rebalanced right away concurrently with each other and ordered rebalance processes.
      * <p/>
      * If not set, cache order is 0, i.e. rebalancing is not ordered.
@@ -1079,7 +1086,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
      * @return {@code this} for chaining.
      */
     public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
-        this.rebalanceBatchSize = rebalanceBatchSize;
+        this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
 
         return this;
     }
@@ -1269,11 +1276,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
+    @Deprecated
     /**
-     * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
-     * may create more threads for rebalancing than specified here (but never less threads).
-     * <p>
-     * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @return Size of rebalancing thread pool.
      */
@@ -1281,9 +1286,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return rebalancePoolSize;
     }
 
+    @Deprecated
     /**
-     * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
-     * for rebalancing than specified here (but never less threads).
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @param rebalancePoolSize Size of rebalancing thread pool.
      * @return {@code this} for chaining.
@@ -1773,6 +1778,33 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Gets number of batches generated by supply node at rebalancing start.
+     *
+     * @return batches count
+     */
+    public long getRebalanceBatchesCount() {
+        return rebalanceBatchesCount;
+    }
+
+    /**
+     *  To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Sets number of batches generated by supply node at rebalancing start.
+     *
+     * @param rebalanceBatchesCnt batches count.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+        this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+        return this;
+    }
+
+    /**
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ecae356..26145e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,6 +149,9 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
+    /** Default limit of threads used at rebalance. */
+    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
 
@@ -354,6 +357,9 @@ public class IgniteConfiguration {
     /** Client mode flag. */
     private Boolean clientMode;
 
+    /** Rebalance thread pool size. */
+    private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
     /** Transactions configuration. */
     private TransactionConfiguration txCfg = new TransactionConfiguration();
 
@@ -518,6 +524,7 @@ public class IgniteConfiguration {
         utilityCachePoolSize = cfg.getUtilityCacheThreadPoolSize();
         waitForSegOnStart = cfg.isWaitForSegmentOnStart();
         warmupClos = cfg.getWarmupClosure();
+        rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
     }
 
     /**
@@ -1331,6 +1338,27 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Gets Max count of threads can be used at rebalancing.
+     * Minimum is 1.
+     * @return count.
+     */
+    public int getRebalanceThreadPoolSize(){
+        return Math.max(1, rebalanceThreadPoolSize);
+    }
+
+    /**
+     * Sets Max count of threads can be used at rebalancing.
+     * Minimum is 1.
+     * @param size Size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+        this.rebalanceThreadPoolSize = size;
+
+        return this;
+    }
+
+    /**
      * Returns a collection of life-cycle beans. These beans will be automatically
      * notified of grid life-cycle events. Use life-cycle beans whenever you
      * want to perform certain logic before and after grid startup and stopping
@@ -2383,4 +2411,4 @@ public class IgniteConfiguration {
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..da8cf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -733,6 +733,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ackEnvironmentVariables();
         ackCacheConfiguration();
         ackP2pConfiguration();
+        ackRebalanceConfiguration();
 
         // Run background network diagnostics.
         GridDiagnostic.runBackgroundCheck(gridName, execSvc, log);
@@ -2135,6 +2136,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     /**
      *
      */
+    private void ackRebalanceConfiguration() throws IgniteCheckedException {
+        if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
+            throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " +
+                "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start.");
+    }
+
+    /**
+     *
+     */
     private void ackCacheConfiguration() {
         CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 02b28c5..7d2b2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2035,6 +2035,7 @@ public class IgnitionEx {
             cache.setAffinity(new RendezvousAffinityFunction(false, 20));
             cache.setNodeFilter(CacheConfiguration.ALL_NODES);
             cache.setStartSize(300);
+            cache.setRebalanceOrder(-2);//Prior to other system caches.
 
             return cache;
         }
@@ -2055,6 +2056,7 @@ public class IgnitionEx {
             cache.setWriteSynchronizationMode(FULL_SYNC);
             cache.setAffinity(new RendezvousAffinityFunction(false, 100));
             cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+            cache.setRebalanceOrder(-1);//Prior to user caches.
 
             return cache;
         }
@@ -2075,6 +2077,7 @@ public class IgnitionEx {
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setCacheMode(cfg.getCacheMode());
             ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
+            ccfg.setRebalanceOrder(-1);//Prior to user caches.
 
             if (cfg.getCacheMode() == PARTITIONED)
                 ccfg.setBackups(cfg.getBackups());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..ae8c753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -684,7 +685,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..112] - this
+            case 114:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
+            // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:
@@ -722,4 +728,4 @@ public class GridIoMessageFactory implements MessageFactory {
 
         CUSTOM.put(type, c);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 476a96c..1fe55d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -483,6 +484,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 114: {
+                GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+                U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..4e92ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
                 assert !hasValueUnlocked() : this;
 
-                obsolete = markObsolete0(obsoleteVer, false);
+                obsolete = markObsolete0(obsoleteVer, false, null);
 
                 assert obsolete : this;
             }
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             synchronized (this) {
                 // If entry is still removed.
                 if (newVer == ver) {
-                    if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) {
+                    if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
                         if (log.isDebugEnabled())
                             log.debug("Entry could not be marked obsolete (it is still used): " + this);
                     }
@@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 try {
                     if ((!hasReaders() || readers)) {
                         // markObsolete will clear the value.
-                        if (!(marked = markObsolete0(ver, true))) {
+                        if (!(marked = markObsolete0(ver, true, null))) {
                             if (log.isDebugEnabled())
                                 log.debug("Entry could not be marked obsolete (it is still used): " + this);
 
@@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean obsolete;
 
         synchronized (this) {
-            obsolete = markObsolete0(ver, true);
+            obsolete = markObsolete0(ver, true, null);
         }
 
         if (obsolete)
@@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         }
                     }
                     else
-                        obsolete = markObsolete0(ver, true);
+                        obsolete = markObsolete0(ver, true, null);
                 }
             }
         }
@@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (!this.ver.equals(ver))
                 return false;
 
-            marked = markObsolete0(ver, true);
+            marked = markObsolete0(ver, true, null);
         }
 
         if (marked)
@@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      *
      * @param ver Version.
      * @param clear {@code True} to clear.
+     * @param extras Predefined extras.
      * @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes.
      */
-    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) {
+    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridCacheObsoleteEntryExtras extras) {
         assert Thread.holdsLock(this);
 
         GridCacheVersion obsoleteVer = obsoleteVersionExtras();
@@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (mvcc == null || mvcc.isEmpty(ver)) {
                 obsoleteVer = ver;
 
-                obsoleteVersionExtras(obsoleteVer);
+                obsoleteVersionExtras(obsoleteVer, extras);
 
                 if (clear)
                     value(null);
@@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 synchronized (this) {
                     if (checkExpired()) {
-                        rmv = markObsolete0(cctx.versions().next(this.ver), true);
+                        rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
 
                         return null;
                     }
@@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             }
                         }
                         else {
-                            if (markObsolete0(obsoleteVer, true))
+                            if (markObsolete0(obsoleteVer, true, null))
                                 obsolete = true; // Success, will return "true".
                         }
                     }
@@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     CacheObject prev = saveOldValueUnlocked(false);
 
-                    if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                    if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
                         if (swap) {
                             if (!isStartVersion()) {
                                 try {
@@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                         CacheObject prevVal = saveValueForIndexUnlocked();
 
-                        if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                        if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
                             if (swap) {
                                 if (!isStartVersion()) {
                                     try {
@@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         GridCacheBatchSwapEntry ret = null;
 
         try {
-            if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+            if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
                 if (!isStartVersion() && hasValueUnlocked()) {
                     if (cctx.offheapTiered() && hasOffHeapPointer()) {
                         if (cctx.swap().offheapEvictionEnabled())
@@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     return false;
 
                 if (checkExpired()) {
-                    rmv = markObsolete0(cctx.versions().next(this.ver), true);
+                    rmv = markObsolete0(cctx.versions().next(this.ver), true, null);
 
                     return false;
                 }
@@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * @param obsoleteVer Obsolete version.
      */
-    protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer) {
-        extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : obsoleteVer != null ?
-            new GridCacheObsoleteEntryExtras(obsoleteVer) : null;
+    protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+        extras = (extras != null) ?
+            extras.obsoleteVersion(obsoleteVer) :
+            obsoleteVer != null ?
+                (ext != null) ? ext : new GridCacheObsoleteEntryExtras(obsoleteVer) :
+                null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..6793f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Queue;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +52,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -65,8 +70,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -75,6 +82,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
@@ -85,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -132,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
+    /** */
+    private final Queue<Callable<Boolean>> rebalancingQueue = new ConcurrentLinkedDeque8<>();
+
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address race conditions when coordinator
@@ -309,6 +321,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         exchWorker.futQ.addFirst(fut);
 
+        if (!cctx.kernalContext().clientNode()) {
+
+            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() {
+                    @Override public void apply(final UUID id, final GridCacheMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            if (m instanceof GridDhtPartitionSupplyMessageV2)
+                                cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+                                    idx, id, (GridDhtPartitionSupplyMessageV2)m);
+                            else if (m instanceof GridDhtPartitionDemandMessage)
+                                cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+                                    idx, id, (GridDhtPartitionDemandMessage)m);
+                            else
+                                log.error("Unsupported message type: " + m.getClass().getName());
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
         new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
 
         if (reconnect) {
@@ -368,6 +408,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     }
 
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object rebalanceTopic(int idx) {
+        return TOPIC_CACHE.topic("Rebalance", idx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -392,6 +440,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(stopErr);
 
+        if (!cctx.kernalContext().clientNode())
+            for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+                cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+            }
+
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())
@@ -1103,9 +1156,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             boolean startEvtFired = false;
 
+            int cnt = 0;
+
+            IgniteInternalFuture asyncStartFut = null;
+
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
+                cnt++;
+
                 try {
                     boolean preloadFinished = true;
 
@@ -1220,12 +1279,106 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
+                        rebalancingQueue.clear();
+
+                        NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
                             int cacheId = e.getKey();
 
                             GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                            cacheCtx.preloader().addAssignments(e.getValue(), forcePreload);
+                            int order = cacheCtx.config().getRebalanceOrder();
+
+                            if (orderMap.get(order) == null)
+                                orderMap.put(order, new LinkedList<Integer>());
+
+                            orderMap.get(order).add(cacheId);
+                        }
+
+                        Callable<Boolean> marsR = null;
+
+                        //Ordered rebalance scheduling.
+                        for (Integer order : orderMap.keySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                                List<String> waitList = new LinkedList<>();
+
+                                for (List<Integer> cIds : orderMap.headMap(order).values()) {
+                                    for (Integer cId : cIds) {
+                                        waitList.add(cctx.cacheContext(cId).name());
+                                    }
+                                }
+
+                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(
+                                    assignsMap.get(cacheId), forcePreload, waitList, cnt);
+
+                                if (r != null) {
+                                    U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
+                                        ", waitList=" + waitList.toString() + "]");
+
+                                    if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+                                        marsR = r;
+                                    else
+                                        rebalancingQueue.add(r);
+                                }
+                            }
+                        }
+
+                        if (asyncStartFut != null)
+                            asyncStartFut.get(); // Wait for thread stop.
+
+                        if (marsR != null || !rebalancingQueue.isEmpty()) {
+                            if (futQ.isEmpty()) {
+                                U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]");
+
+                                if (marsR != null)
+                                    try {
+                                        marsR.call();//Marshaller cache rebalancing launches in sync way.
+                                    }
+                                    catch (Exception ex) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send initial demand request to node");
+
+                                        continue;
+                                    }
+
+                                final GridFutureAdapter fut = new GridFutureAdapter();
+
+                                asyncStartFut = fut;
+
+                                cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+                                    @Override public Boolean call() {
+                                        try {
+                                            while (true) {
+                                                Callable<Boolean> r = rebalancingQueue.poll();
+
+                                                if (r == null)
+                                                    return false;
+
+                                                if (!r.call())
+                                                    return false;
+                                            }
+                                        }
+                                        catch (Exception ex) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Failed to send initial demand request to node");
+
+                                            return false;
+                                        }
+                                        finally {
+                                            fut.onDone();
+                                        }
+                                    }
+                                }, /*system pool*/ true);
+                            }
+                            else {
+                                U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
+                            }
+                        }
+                        else {
+                            U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]");
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 755958e..79861a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -18,9 +18,14 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +95,11 @@ public interface GridCachePreloader {
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
+     * @param caches Rebalancing of these caches will be finished before this started.
+     * @param cnt Counter.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+        Collection<String> caches, int cnt);
 
     /**
      * @param p Preload predicate.
@@ -115,6 +123,11 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<?> syncFuture();
 
     /**
+     * @return Future which will complete when preloading is finished on current topology.
+     */
+    public IgniteInternalFuture<Boolean> rebalanceFuture();
+
+    /**
      * Requests that preloader sends the request for the key.
      *
      * @param keys Keys to request.
@@ -132,4 +145,30 @@ public interface GridCachePreloader {
      * Unwinds undeploys.
      */
     public void unwindUndeploys();
-}
\ No newline at end of file
+
+
+    /**
+     * Handles Supply message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param s Supply message.
+     */
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+    /**
+     * Handles Demand message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param d Demand message.
+     */
+    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d);
+
+    /**
+     * Evicts partition asynchronously.
+     *
+     * @param part Partition.
+     */
+    public void evictPartitionAsync(GridDhtLocalPartition part);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 5405449..b784383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -18,11 +18,16 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -36,7 +41,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     /** Cache context. */
     protected final GridCacheContext<?, ?> cctx;
 
-    /** Logger.*/
+    /** Logger. */
     protected final IgniteLogger log;
 
     /** Affinity. */
@@ -113,12 +118,28 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return new GridFinishedFuture<>(true);
+    }
+
+    /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
         cctx.deploy().unwind(cctx);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+    @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer) {
         return new GridFinishedFuture<>();
     }
 
@@ -143,7 +164,13 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+        Collection<String> caches, int cnt) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         // No-op.
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 722e570..c2acd99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -31,9 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -99,7 +97,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -162,12 +159,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Map of proxies. */
     private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
-    /** Map of preload finish futures grouped by preload order. */
-    private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
-
-    /** Maximum detected rebalance order. */
-    private int maxRebalanceOrder;
-
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
 
@@ -209,7 +200,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         caches = new ConcurrentHashMap<>();
         jCacheProxies = new ConcurrentHashMap<>();
-        preloadFuts = new TreeMap<>();
 
         stopSeq = new LinkedList<>();
     }
@@ -392,10 +382,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
                 U.maskName(cc.getName()) + ']');
 
-        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
-            assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0");
+        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
             assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
-        }
 
         if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
             if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
@@ -614,8 +602,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     "Deployment mode for cache is not CONTINUOUS or SHARED.");
         }
 
-        maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());
-
         ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
             new CustomEventListener<DynamicCacheChangeBatch>() {
                 @Override public void onCustomEvent(ClusterNode snd,
@@ -846,31 +832,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
             mgr.onKernalStart(false);
 
-        for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
-            GridCacheAdapter cache = e.getValue();
-
-            if (maxRebalanceOrder > 0) {
-                CacheConfiguration cfg = cache.configuration();
-
-                int order = cfg.getRebalanceOrder();
-
-                if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) {
-                    GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order);
-
-                    if (fut == null) {
-                        fut = new GridCompoundFuture<>();
-
-                        preloadFuts.put(order, fut);
-                    }
-
-                    fut.add(cache.preloader().syncFuture());
-                }
-            }
-        }
-
-        for (IgniteInternalFuture<?> fut : preloadFuts.values())
-            ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
@@ -2779,19 +2740,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future
-     * with maximum order less than {@code order}.
-     *
-     * @param order Cache order.
-     * @return Compound preload future or {@code null} if order is minimal order found.
-     */
-    @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) {
-        Map.Entry<Integer, IgniteInternalFuture<?>> entry = preloadFuts.lowerEntry(order);
-
-        return entry == null ? null : entry.getValue();
-    }
-
-    /**
      * @param spaceName Space name.
      * @param keyBytes Key bytes.
      * @param valBytes Value bytes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..b2279ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @return {@code True} if entry was not being used, passed the filter and could be removed.
      * @throws IgniteCheckedException If failed to remove from swap.
      */
-    public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException {
+    public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
         boolean rmv = false;
 
         try {
@@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
                 // Call markObsolete0 to avoid recursive calls to clear if
                 // we are clearing dht local partition (onMarkedObsolete should not be called).
-                if (!markObsolete0(ver, false)) {
+                if (!markObsolete0(ver, false, extras)) {
                     if (log.isDebugEnabled())
                         log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4f124e6..b3c13a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,10 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCircularBuffer;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -38,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -46,18 +56,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -286,7 +284,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         }
 
         // Attempt to evict.
-        tryEvict(true);
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -411,7 +409,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             // Decrement reservations.
             if (state.compareAndSet(s, s, reservations, --reservations)) {
-                tryEvict(true);
+                cctx.preloader().evictPartitionAsync(this);
 
                 break;
             }
@@ -479,7 +477,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      * @return Future for evict attempt.
      */
-    IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+    void tryEvictAsync(boolean updateSeq) {
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
             state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
@@ -497,15 +495,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
 
             clearDeferredDeletes();
-
-            return new GridFinishedFuture<>(true);
         }
-
-        return cctx.closures().callLocalSafe(new GPC<Boolean>() {
-            @Override public Boolean call() {
-                return tryEvict(true);
-            }
-        }, /*system pool*/ true);
+        else {
+            cctx.preloader().evictPartitionAsync(this);
+        }
     }
 
     /**
@@ -521,12 +514,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @param updateSeq Update sequence.
      * @return {@code True} if entry has been transitioned to state EVICTED.
      */
-    boolean tryEvict(boolean updateSeq) {
+    public void tryEvict() {
         if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
-            return false;
+            return;
 
         // Attempt to evict partition entries from cache.
         clearAll();
@@ -545,14 +537,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             rent.onDone();
 
-            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true);
 
             clearDeferredDeletes();
-
-            return true;
         }
-
-        return false;
     }
 
     /**
@@ -592,7 +580,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     void onUnlock() {
-        tryEvict(true);
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -632,6 +620,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
                 it = F.concat(it, unswapIt);
         }
 
+        GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
+
         try {
             while (it.hasNext()) {
                 GridDhtCacheEntry cached = null;
@@ -639,7 +629,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
                 try {
                     cached = it.next();
 
-                    if (cached.clearInternal(clearVer, swap)) {
+                    if (cached.clearInternal(clearVer, swap, extras)) {
                         map.remove(cached.key(), cached);
 
                         if (!cached.isInternal()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 848ad87..885b0dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -116,6 +116,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     }
 
     /**
+     * @param updateSeq Update sequence.
+     */
+    void updateSequence(long updateSeq) {
+        this.updateSeq = updateSeq;
+    }
+
+    /**
      * @return Update sequence.
      */
     long updateSequence() {
@@ -320,7 +327,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super",
+        return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super",
             super.toString());
     }
 }


[4/5] ignite git commit: Ignite-1093

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index e993a88..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
-    /** Dummy message to wake up a blocking queue if a node leaves. */
-    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    @GridToStringInclude
-    private final Collection<DemandWorker> dmdWorkers;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
-    @GridToStringInclude
-    private SyncFuture syncFut;
-
-    /** Preload timeout. */
-    private final AtomicLong timeout;
-
-    /** Allows demand threads to synchronize their step. */
-    private CyclicBarrier barrier;
-
-    /** Demand lock. */
-    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
-    /** */
-    private int poolSize;
-
-    /** Last timeout object. */
-    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
-    /** Last exchange future. */
-    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
-        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-        if (enabled) {
-            barrier = new CyclicBarrier(poolSize);
-
-            dmdWorkers = new ArrayList<>(poolSize);
-
-            for (int i = 0; i < poolSize; i++)
-                dmdWorkers.add(new DemandWorker(i));
-
-            syncFut = new SyncFuture(dmdWorkers);
-        }
-        else {
-            dmdWorkers = Collections.emptyList();
-
-            syncFut = new SyncFuture(dmdWorkers);
-
-            // Calling onDone() immediately since preloading is disabled.
-            syncFut.onDone();
-        }
-
-        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
-    }
-
-    /**
-     *
-     */
-    void start() {
-        if (poolSize > 0) {
-            for (DemandWorker w : dmdWorkers)
-                new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
-        }
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(dmdWorkers);
-
-        if (log.isDebugEnabled())
-            log.debug("Before joining on demand workers: " + dmdWorkers);
-
-        U.join(dmdWorkers, log);
-
-        if (log.isDebugEnabled())
-            log.debug("After joining on demand workers: " + dmdWorkers);
-
-        lastExchangeFut = null;
-
-        lastTimeoutObj.set(null);
-    }
-
-    /**
-     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
-     */
-    IgniteInternalFuture<?> syncFuture() {
-        return syncFut;
-    }
-
-    /**
-     * Sets preload predicate for demand pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return poolSize;
-    }
-
-    /**
-     * Wakes up demand workers when new exchange future was added.
-     */
-    void onExchangeFutureAdded() {
-        synchronized (dmdWorkers) {
-            for (DemandWorker w : dmdWorkers)
-                w.addMessage(DUMMY_TOP);
-        }
-    }
-
-    /**
-     * Force preload.
-     */
-    void forcePreload() {
-        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
-        if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
-
-        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-        if (exchFut != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing rebalance event for future: " + exchFut);
-
-            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    cctx.shared().exchange().forcePreloadExchange(exchFut);
-                }
-            });
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Ignoring force rebalance request (no topology event happened yet).");
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-        preloadEvent(-1, type, discoEvt);
-    }
-
-    /**
-     * @param part Partition.
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
-        assert discoEvt != null;
-
-        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
-    }
-
-    /**
-     * @param msg Message to check.
-     * @return {@code True} if dummy message.
-     */
-    private boolean dummyTopology(SupplyMessage msg) {
-        return msg == DUMMY_TOP;
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Picked owners.
-     */
-    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        int affCnt = affNodes.size();
-
-        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
-        int rmtCnt = rmts.size();
-
-        if (rmtCnt <= affCnt)
-            return rmts;
-
-        List<ClusterNode> sorted = new ArrayList<>(rmts);
-
-        // Sort in descending order, so nodes with higher order will be first.
-        Collections.sort(sorted, CU.nodeComparator(false));
-
-        // Pick newest nodes.
-        return sorted.subList(0, affCnt);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Nodes owning this partition.
-     */
-    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
-    }
-
-    /**
-     * @param assigns Assignments.
-     * @param force {@code True} if dummy reassign.
-     */
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
-        if (log.isDebugEnabled())
-            log.debug("Adding partition assignments: " + assigns);
-
-        long delay = cctx.config().getRebalanceDelay();
-
-        if (delay == 0 || force) {
-            assert assigns != null;
-
-            synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers) {
-                    w.addAssignments(assigns);
-
-                    w.addMessage(DUMMY_TOP);
-                }
-            }
-        }
-        else if (delay > 0) {
-            assert !force;
-
-            GridTimeoutObject obj = lastTimeoutObj.get();
-
-            if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
-
-            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-            assert exchFut != null : "Delaying rebalance process without topology event.";
-
-            obj = new GridTimeoutObjectAdapter(delay) {
-                @Override public void onTimeout() {
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forcePreloadExchange(exchFut);
-                        }
-                    });
-                }
-            };
-
-            lastTimeoutObj.set(obj);
-
-            cctx.time().addTimeoutObject(obj);
-        }
-    }
-
-    /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
-
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandPool.class, this);
-    }
-
-    /**
-     *
-     */
-    private class DemandWorker extends GridWorker {
-        /** Worker ID. */
-        private int id;
-
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
-        /** Message queue. */
-        private final LinkedBlockingDeque<SupplyMessage> msgQ =
-            new LinkedBlockingDeque<>();
-
-        /** Counter. */
-        private long cntr;
-
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
-        /**
-         * @param id Worker ID.
-         */
-        private DemandWorker(int id) {
-            super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
-            assert id >= 0;
-
-            this.id = id;
-        }
-
-        /**
-         * @param assigns Assignments.
-         */
-        void addAssignments(GridDhtPreloaderAssignments assigns) {
-            assert assigns != null;
-
-            assignQ.offer(assigns);
-
-            if (log.isDebugEnabled())
-                log.debug("Added assignments to worker: " + this);
-        }
-
-        /**
-         * @return {@code True} if topology changed.
-         */
-        private boolean topologyChanged() {
-            return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
-        }
-
-        /**
-         * @param msg Message.
-         */
-        private void addMessage(SupplyMessage msg) {
-            if (!enterBusy())
-                return;
-
-            try {
-                assert dummyTopology(msg) || msg.supply().workerId() == id;
-
-                msgQ.offer(msg);
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-
-        /**
-         * @param timeout Timed out value.
-         */
-        private void growTimeout(long timeout) {
-            long newTimeout = (long)(timeout * 1.5D);
-
-            // Account for overflow.
-            if (newTimeout < 0)
-                newTimeout = Long.MAX_VALUE;
-
-            // Grow by 50% only if another thread didn't do it already.
-            if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
-                U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
-                    newTimeout + "ms.");
-        }
-
-        /**
-         * @param pick Node picked for preloading.
-         * @param p Partition.
-         * @param entry Preloaded entry.
-         * @param topVer Topology version.
-         * @return {@code False} if partition has become invalid during preloading.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private boolean preloadEntry(
-            ClusterNode pick,
-            int p,
-            GridCacheEntryInfo entry,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            try {
-                GridCacheEntryEx cached = null;
-
-                try {
-                    cached = cctx.dht().entryEx(entry.key());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
-                    if (cctx.dht().isIgfsDataCache() &&
-                        cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
-                        LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
-                            "value, will ignore rebalance entries): " + name());
-
-                        if (cached.markObsoleteIfEmpty(null))
-                            cached.context().cache().removeIfObsolete(cached.key());
-
-                        return true;
-                    }
-
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start tracking.
-
-                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null);
-                        }
-                        else if (log.isDebugEnabled())
-                            log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
-                                ", part=" + p + ']');
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
-                            cached.key() + ", part=" + p + ']');
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
-                    return false;
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
-            }
-
-            return true;
-        }
-
-        /**
-         * @param idx Unique index for this topic.
-         * @return Topic for partition.
-         */
-        public Object topic(long idx) {
-            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @return Missed partitions.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private Set<Integer> demandFromNode(
-            ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            GridDhtPartitionDemandMessage d,
-            GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            GridDhtPartitionTopology top = cctx.dht().topology();
-
-            cntr++;
-
-            d.topic(topic(cntr));
-            d.workerId(id);
-
-            Set<Integer> missed = new HashSet<>();
-
-            // Get the same collection that will be sent in the message.
-            Collection<Integer> remaining = d.partitions();
-
-            // Drain queue before processing a new node.
-            drainQueue();
-
-            if (isCancelled() || topologyChanged())
-                return missed;
-
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
-                    addMessage(new SupplyMessage(nodeId, msg));
-                }
-            });
-
-            try {
-                boolean retry;
-
-                // DoWhile.
-                // =======
-                do {
-                    retry = false;
-
-                    // Create copy.
-                    d = new GridDhtPartitionDemandMessage(d, remaining);
-
-                    long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
-                    d.timeout(timeout);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
-                    // Send demand message.
-                    cctx.io().send(node, d, cctx.ioPolicy());
-
-                    // While.
-                    // =====
-                    while (!isCancelled() && !topologyChanged()) {
-                        SupplyMessage s = poll(msgQ, timeout, this);
-
-                        // If timed out.
-                        if (s == null) {
-                            if (msgQ.isEmpty()) { // Safety check.
-                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
-                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
-                                    " configuration properties).");
-
-                                growTimeout(timeout);
-
-                                // Ordered listener was removed if timeout expired.
-                                cctx.io().removeOrderedHandler(d.topic());
-
-                                // Must create copy to be able to work with IO manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage(d, remaining);
-
-                                // Create new topic.
-                                d.topic(topic(++cntr));
-
-                                // Create new ordered listener.
-                                cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                                        @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage msg) {
-                                            addMessage(new SupplyMessage(nodeId, msg));
-                                        }
-                                    });
-
-                                // Resend message with larger timeout.
-                                retry = true;
-
-                                break; // While.
-                            }
-                            else
-                                continue; // While.
-                        }
-
-                        // If topology changed.
-                        if (dummyTopology(s)) {
-                            if (topologyChanged())
-                                break; // While.
-                            else
-                                continue; // While.
-                        }
-
-                        // Check that message was received from expected node.
-                        if (!s.senderId().equals(node.id())) {
-                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
-                            continue; // While.
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received supply message: " + s);
-
-                        GridDhtPartitionSupplyMessage supply = s.supply();
-
-                        // Check whether there were class loading errors on unmarshal
-                        if (supply.classError() != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Class got undeployed during preloading: " + supply.classError());
-
-                            retry = true;
-
-                            // Quit preloading.
-                            break;
-                        }
-
-                        // Preload.
-                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                            int p = e.getKey();
-
-                            if (cctx.affinity().localNode(p, topVer)) {
-                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                                assert part != null;
-
-                                if (part.state() == MOVING) {
-                                    boolean reserved = part.reserve();
-
-                                    assert reserved : "Failed to reserve partition [gridName=" +
-                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
-                                    part.lock();
-
-                                    try {
-                                        Collection<Integer> invalidParts = new GridLeanSet<>();
-
-                                        // Loop through all received entries and try to preload them.
-                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
-                                            if (!invalidParts.contains(p)) {
-                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Preloading is not permitted for entry due to " +
-                                                            "evictions [key=" + entry.key() +
-                                                            ", ver=" + entry.version() + ']');
-
-                                                    continue;
-                                                }
-
-                                                if (!preloadEntry(node, p, entry, topVer)) {
-                                                    invalidParts.add(p);
-
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Got entries for invalid partition during " +
-                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-                                                }
-                                            }
-                                        }
-
-                                        boolean last = supply.last().contains(p);
-
-                                        // If message was last for this partition,
-                                        // then we take ownership.
-                                        if (last) {
-                                            remaining.remove(p);
-
-                                            top.own(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Finished rebalancing partition: " + part);
-
-                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                                    exchFut.discoveryEvent());
-                                        }
-                                    }
-                                    finally {
-                                        part.unlock();
-                                        part.release();
-                                    }
-                                }
-                                else {
-                                    remaining.remove(p);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
-                                }
-                            }
-                            else {
-                                remaining.remove(p);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
-                            }
-                        }
-
-                        remaining.removeAll(s.supply().missed());
-
-                        // Only request partitions based on latest topology version.
-                        for (Integer miss : s.supply().missed())
-                            if (cctx.affinity().localNode(miss, topVer))
-                                missed.add(miss);
-
-                        if (remaining.isEmpty())
-                            break; // While.
-
-                        if (s.supply().ack()) {
-                            retry = true;
-
-                            break;
-                        }
-                    }
-                }
-                while (retry && !isCancelled() && !topologyChanged());
-
-                return missed;
-            }
-            finally {
-                cctx.io().removeOrderedHandler(d.topic());
-            }
-        }
-
-        /**
-         * @throws InterruptedException If interrupted.
-         */
-        private void drainQueue() throws InterruptedException {
-            while (!msgQ.isEmpty()) {
-                SupplyMessage msg = msgQ.take();
-
-                if (log.isDebugEnabled())
-                    log.debug("Drained supply message: " + msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                if (!CU.isMarshallerCache(cctx.name())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
-                    try {
-                        cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                if (rebalanceOrder > 0) {
-                    IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
-                    try {
-                        if (fut != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
-                                    ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                            fut.get();
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                boolean stopEvtFired = false;
-
-                while (!isCancelled()) {
-                    try {
-                        barrier.await();
-
-                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
-                            cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
-                            if (!cctx.isReplicated() || !stopEvtFired) {
-                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                                stopEvtFired = true;
-                            }
-                        }
-                    }
-                    catch (BrokenBarrierException ignore) {
-                        throw new InterruptedException("Demand worker stopped.");
-                    }
-
-                    // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments assigns = null;
-
-                    while (assigns == null)
-                        assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
-                    demandLock.readLock().lock();
-
-                    try {
-                        exchFut = assigns.exchangeFuture();
-
-                        // Assignments are empty if preloading is disabled.
-                        if (assigns.isEmpty())
-                            continue;
-
-                        boolean resync = false;
-
-                        // While.
-                        // =====
-                        while (!isCancelled() && !topologyChanged() && !resync) {
-                            Collection<Integer> missed = new HashSet<>();
-
-                            // For.
-                            // ===
-                            for (ClusterNode node : assigns.keySet()) {
-                                if (topologyChanged() || isCancelled())
-                                    break; // For.
-
-                                GridDhtPartitionDemandMessage d = assigns.remove(node);
-
-                                // If another thread is already processing this message,
-                                // move to the next node.
-                                if (d == null)
-                                    continue; // For.
-
-                                try {
-                                    Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
-                                    if (!set.isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
-                                                set + ']');
-
-                                        missed.addAll(set);
-                                    }
-                                }
-                                catch (IgniteInterruptedCheckedException e) {
-                                    throw e;
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
-                                            ", msg=" + e.getMessage() + ']');
-
-                                    resync = true;
-
-                                    break; // For.
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
-                                }
-                            }
-
-                            // Processed missed entries.
-                            if (!missed.isEmpty()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reassigning partitions that were missed: " + missed);
-
-                                assert exchFut.exchangeId() != null;
-
-                                cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                            }
-                            else
-                                break; // While.
-                        }
-                    }
-                    finally {
-                        demandLock.readLock().unlock();
-
-                        syncFut.onWorkerDone(this);
-                    }
-
-                    cctx.shared().exchange().scheduleResendPartitions();
-                }
-            }
-            finally {
-                // Safety.
-                syncFut.onWorkerDone(this);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
-        }
-    }
-
-    /**
-     * Sets last exchange future.
-     *
-     * @param lastFut Last future to set.
-     */
-    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        lastExchangeFut = lastFut;
-    }
-
-    /**
-     * @param exchFut Exchange future.
-     * @return Assignments of partitions to nodes.
-     */
-    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
-
-        if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        int partCnt = cctx.affinity().partitions();
-
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
-
-        for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
-                        exchFut.exchangeId());
-
-                break;
-            }
-
-            // If partition belongs to local node.
-            if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                assert part != null;
-                assert part.id() == p;
-
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
-                    continue; // For.
-                }
-
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
-                if (picked.isEmpty()) {
-                    top.own(part);
-
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.first(picked);
-
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
-
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
-                    }
-
-                    msg.addPartition(p);
-                }
-            }
-        }
-
-        return assigns;
-    }
-
-    /**
-     *
-     */
-    private class SyncFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Remaining workers. */
-        private Collection<DemandWorker> remaining;
-
-        /**
-         * @param workers List of workers.
-         */
-        private SyncFuture(Collection<DemandWorker> workers) {
-            assert workers.size() == poolSize();
-
-            remaining = Collections.synchronizedList(new LinkedList<>(workers));
-        }
-
-        /**
-         * @param w Worker who iterated through all partitions.
-         */
-        void onWorkerDone(DemandWorker w) {
-            if (isDone())
-                return;
-
-            if (remaining.remove(w))
-                if (log.isDebugEnabled())
-                    log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
-            if (remaining.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed sync future.");
-
-                onDone();
-            }
-        }
-    }
-
-    /**
-     * Supply message wrapper.
-     */
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
-
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
-        }
-    }
-}
\ No newline at end of file


[2/5] ignite git commit: Ignite-1093

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..694088b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,999 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. T2: nodeId, idx. */
+    private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap =
+        new ConcurrentHashMap8<>();
+
+    /** Rebalancing listener. */
+    private GridLocalEventListener lsnr;
+
+    /**
+     * @param cctx Cache context.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     *
+     */
+    void start() {
+        lsnr = new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                if (evt instanceof DiscoveryEvent) {
+                    for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
+                        T2<UUID, Integer> t = entry.getKey();
+
+                        if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
+                            SupplyContext sctx = entry.getValue();
+
+                            clearContext(sctx, log);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]");
+
+                            scMap.remove(t, sctx);
+                        }
+                    }
+                }
+                else {
+                    assert false;
+                }
+            }
+        };
+
+        cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        startOldListeners();
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        if (lsnr != null)
+            cctx.events().removeListener(lsnr);
+
+        for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
+            clearContext(entry.getValue(), log);
+        }
+
+        stopOldListeners();
+    }
+
+    /**
+     * Clear context.
+     *
+     * @param sc Supply context.
+     * @param log Logger.
+     * @return true in case context was removed.
+     */
+    private static void clearContext(
+        final SupplyContext sc,
+        final IgniteLogger log) {
+        if (sc != null) {
+            final Iterator it = sc.entryIt;
+
+            if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
+                try {
+                    synchronized (it) {
+                        if (!((GridCloseableIterator)it).isClosed())
+                            ((GridCloseableIterator)it).close();
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Iterator close failed.", e);
+                }
+            }
+
+            final GridDhtLocalPartition loc = sc.loc;
+
+            if (loc != null && loc.reservations() > 0) {
+                synchronized (loc) {
+                    if (loc.reservations() > 0)
+                        loc.release();
+                }
+
+            }
+        }
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * @param d Demand message.
+     * @param idx Index.
+     * @param id Node uuid.
+     */
+    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+        assert d != null;
+        assert id != null;
+
+        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion demTop = d.topologyVersion();
+
+        if (cutTop.compareTo(demTop) > 0) {
+            if (log.isDebugEnabled())
+                log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
+                    ", from=" + id + ", idx=" + idx + "]");
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
+                ", from=" + id + ", idx=" + idx + "]");
+
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+            d.updateSequence(), cctx.cacheId(), d.topologyVersion());
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        T2<UUID, Integer> scId = new T2<>(id, idx);
+
+        try {
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) {
+                clearContext(sctx, log);
+
+                sctx = null;
+            }
+
+            if (sctx == null && d.partitions() == null)
+                return;
+
+            assert !(sctx != null && d.partitions() != null);
+
+            long bCnt = 0;
+
+            int phase = 0;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+                        ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                        ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+                        ", idx=" + idx + "]");
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc;
+
+                if (sctx != null && sctx.loc != null) {
+                    loc = sctx.loc;
+
+                    assert loc.reservations() > 0;
+                }
+                else {
+                    loc = top.localPartition(part, d.topologyVersion(), false);
+
+                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                        // Reply with partition of "-1" to let sender know that
+                        // this node is no longer an owner.
+                        s.missed(part);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested partition is not owned by local node [part=" + part +
+                                ", demander=" + id + ']');
+
+                        continue;
+                    }
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == 0)
+                        phase = 1;
+
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        entIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
+
+                                    swapLsnr = null;
+                                    loc = null;
+
+                                    reply(node, d, s, scId);
+
+                                    return;
+                                }
+                                else {
+                                    if (!reply(node, d, s, scId))
+                                        return;
+
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == 1) {
+                        phase = 2;
+
+                        if (sctx != null) {
+                            sctx = new SupplyContext(
+                                phase,
+                                partIt,
+                                null,
+                                swapLsnr,
+                                part,
+                                loc,
+                                d.topologyVersion(),
+                                d.updateSequence());
+                        }
+                    }
+
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            sctx != null && sctx.entryIt != null ?
+                                (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                                cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            boolean prepared = false;
+
+                            while (iter.hasNext()) {
+                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                    // Demander no longer needs this partition,
+                                    // so we send '-1' partition and move on.
+                                    s.missed(part);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Demanding node does not need requested partition " +
+                                            "[part=" + part + ", nodeId=" + id + ']');
+
+                                    partMissing = true;
+
+                                    break; // For.
+                                }
+
+                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                    if (++bCnt >= maxBatchesCnt) {
+                                        saveSupplyContext(scId,
+                                            phase,
+                                            partIt,
+                                            part,
+                                            iter,
+                                            swapLsnr,
+                                            loc,
+                                            d.topologyVersion(),
+                                            d.updateSequence());
+
+                                        swapLsnr = null;
+                                        loc = null;
+
+                                        reply(node, d, s, scId);
+
+                                        return;
+                                    }
+                                    else {
+                                        if (!reply(node, d, s, scId))
+                                            return;
+
+                                        s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                            cctx.cacheId(), d.topologyVersion());
+                                    }
+                                }
+
+                                Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                GridCacheSwapEntry swapEntry = e.getValue();
+
+                                GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                info.keyBytes(e.getKey());
+                                info.ttl(swapEntry.ttl());
+                                info.expireTime(swapEntry.expireTime());
+                                info.version(swapEntry.version());
+                                info.value(swapEntry.value());
+
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry0(part, info, cctx);
+                                else {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Rebalance predicate evaluated to false (will not send " +
+                                            "cache entry): " + info);
+
+                                    continue;
+                                }
+
+                                // Need to manually prepare cache message.
+                                if (depEnabled && !prepared) {
+                                    ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                        cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                        swapEntry.valueClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                            null;
+
+                                    if (ldr == null)
+                                        continue;
+
+                                    if (ldr instanceof GridDeploymentInfo) {
+                                        s.prepare((GridDeploymentInfo)ldr);
+
+                                        prepared = true;
+                                    }
+                                }
+                            }
+
+                            iter.close();
+
+                            if (partMissing)
+                                continue;
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == 2) {
+                        phase = 3;
+
+                        if (sctx != null) {
+                            sctx = new SupplyContext(
+                                phase,
+                                partIt,
+                                null,
+                                null,
+                                part,
+                                loc,
+                                d.topologyVersion(),
+                                d.updateSequence());
+                        }
+                    }
+
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        lsnrIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
+
+                                    loc = null;
+
+                                    reply(node, d, s, scId);
+
+                                    return;
+                                }
+                                else {
+                                    if (!reply(node, d, s, scId))
+                                        return;
+
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    phase = 0;
+
+                    sctx = null;
+                }
+                finally {
+                    if (loc != null)
+                        loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            scMap.remove(scId);
+
+            reply(node, d, s, scId);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+                    ", fromNode=" + node.id() +
+                    ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+                    ", idx=" + idx + "]");
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n,
+        GridDhtPartitionDemandMessage d,
+        GridDhtPartitionSupplyMessageV2 s,
+        T2<UUID, Integer> scId)
+        throws IgniteCheckedException {
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            // Throttle preloading.
+            if (cctx.config().getRebalanceThrottle() > 0)
+                U.sleep(cctx.config().getRebalanceThrottle());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            clearContext(scMap.remove(scId), log);
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2<UUID, Integer> t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+        GridDhtLocalPartition loc,
+        AffinityTopologyVersion topVer,
+        long updateSeq) {
+        SupplyContext old = scMap.putIfAbsent(t,
+            new SupplyContext(phase,
+                partIt,
+                entryIt,
+                swapLsnr,
+                part,
+                loc,
+                topVer,
+                updateSeq));
+
+        assert old == null;
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext {
+        /** Phase. */
+        private final int phase;
+
+        /** Partition iterator. */
+        private final Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private final Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private final GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        private final int part;
+
+        /** Local partition. */
+        private final GridDhtLocalPartition loc;
+
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Update seq. */
+        private final long updateSeq;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(int phase,
+            Iterator<Integer> partIt,
+            Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr,
+            int part,
+            GridDhtLocalPartition loc,
+            AffinityTopologyVersion topVer,
+            long updateSeq) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+            this.loc = loc;
+            this.topVer = topVer;
+            this.updateSeq = updateSeq;
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void startOldListeners() {
+        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processOldDemandMessage(m, id);
+                }
+            });
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void stopOldListeners() {
+        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+
+            cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
+        }
+    }
+
+    /**
+     * @param d D.
+     * @param id Id.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        ClusterNode node = cctx.node(id);
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        boolean ack = false;
+
+        try {
+            for (int part : d.partitions()) {
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    for (GridCacheEntryEx e : loc.entries()) {
+                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                            // Demander no longer needs this partition, so we send '-1' partition and move on.
+                            s.missed(part);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Demanding node does not need requested partition [part=" + part +
+                                    ", nodeId=" + id + ']');
+
+                            partMissing = true;
+
+                            break;
+                        }
+
+                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            ack = true;
+
+                            if (!replyOld(node, d, s))
+                                return;
+
+                            // Throttle preloading.
+                            if (preloadThrottle > 0)
+                                U.sleep(preloadThrottle);
+
+                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                cctx.cacheId());
+                        }
+
+                        GridCacheEntryInfo info = e.info();
+
+                        if (info != null && !info.isNew()) {
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    if (partMissing)
+                        continue;
+
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
+
+                                        if (!replyOld(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                            d.updateSequence(), cctx.cacheId());
+                                    }
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        for (GridCacheEntryInfo info : entries) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
+
+                                if (!replyOld(node, d, s))
+                                    return;
+
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                    d.updateSequence(),
+                                    cctx.cacheId());
+                            }
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
+                    }
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            replyOld(node, d, s);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..bb89a42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,404 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Update sequence. */
+    private long updateSeq;
+
+    /** Acknowledgement flag. */
+    private boolean ack;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Partitions that have been fully sent. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> last;
+
+    /** Partitions which were not found. */
+    @GridToStringInclude
+    @GridDirectCollection(int.class)
+    private Collection<Integer> missed;
+
+    /** Entries. */
+    @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+    private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+    /** Message size. */
+    @GridDirectTransient
+    private int msgSize;
+
+    /**
+     * @param updateSeq Update sequence for this node.
+     * @param cacheId Cache ID.
+     */
+    GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+        assert updateSeq > 0;
+
+        this.cacheId = cacheId;
+        this.updateSeq = updateSeq;
+        this.topVer = topVer;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionSupplyMessageV2() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ignoreClassErrors() {
+        return true;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * Marks this message for acknowledgment.
+     */
+    void markAck() {
+        ack = true;
+    }
+
+    /**
+     * @return Acknowledgement flag.
+     */
+    boolean ack() {
+        return ack;
+    }
+
+    /**
+     * @return Topology version for which demand message is sent.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Flag to indicate last message for partition.
+     */
+    Collection<Integer> last() {
+        return last == null ? Collections.<Integer>emptySet() : last;
+    }
+
+    /**
+     * @param p Partition which was fully sent.
+     */
+    void last(int p) {
+        if (last == null)
+            last = new HashSet<>();
+
+        if (last.add(p)) {
+            msgSize += 4;
+
+            // If partition is empty, we need to add it.
+            if (!infos.containsKey(p)) {
+                CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+                infoCol.init();
+
+                infos.put(p, infoCol);
+            }
+        }
+    }
+
+    /**
+     * @param p Missed partition.
+     */
+    void missed(int p) {
+        if (missed == null)
+            missed = new HashSet<>();
+
+        if (missed.add(p))
+            msgSize += 4;
+    }
+
+    /**
+     * @return Missed partitions.
+     */
+    Collection<Integer> missed() {
+        return missed == null ? Collections.<Integer>emptySet() : missed;
+    }
+
+    /**
+     * @return Entries.
+     */
+    Map<Integer, CacheEntryInfoCollection> infos() {
+        return infos;
+    }
+
+    /**
+     * @return Message size.
+     */
+    int messageSize() {
+        return msgSize;
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+        assert (info.key() != null || info.keyBytes() != null);
+        assert info.value() != null;
+
+        // Need to call this method to initialize info properly.
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+        for (CacheEntryInfoCollection col : infos().values()) {
+            List<GridCacheEntryInfo> entries = col.infos();
+
+            for (int i = 0; i < entries.size(); i++)
+                entries.get(i).unmarshal(cacheCtx, ldr);
+        }
+    }
+
+    /**
+     * @return Number of entries in message.
+     */
+    public int size() {
+        return infos.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("ack", ack))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                ack = reader.readBoolean("ack");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 114;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 9;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+            "size", size(),
+            "parts", infos.keySet(),
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index fe328ef..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    private GridDhtPartitionTopology top;
-
-    /** */
-    private final Collection<SupplyWorker> workers = new LinkedList<>();
-
-    /** */
-    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
-    /** */
-    private final boolean depEnabled;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        top = cctx.dht().topology();
-
-        if (!cctx.kernalContext().clientNode()) {
-            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-            for (int i = 0; i < poolSize; i++)
-                workers.add(new SupplyWorker());
-
-            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                    processDemandMessage(id, m);
-                }
-            });
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     *
-     */
-    void start() {
-        for (SupplyWorker w : workers)
-            new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(workers);
-        U.join(workers, log);
-
-        top = null;
-    }
-
-    /**
-     * Sets preload predicate for supply pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return cctx.config().getRebalanceThreadPoolSize();
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param d Message.
-     */
-    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
-        if (!enterBusy())
-            return;
-
-        try {
-            if (cctx.rebalanceEnabled()) {
-                if (log.isDebugEnabled())
-                    log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
-                queue.offer(new DemandMessage(nodeId, d));
-            }
-            else
-                U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(2000, MILLISECONDS);
-    }
-
-    /**
-     * Supply work.
-     */
-    private class SupplyWorker extends GridWorker {
-        /** Hide worker logger and use cache logger. */
-        private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
-        /**
-         * Default constructor.
-         */
-        private SupplyWorker() {
-            super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                DemandMessage msg = poll(queue, this);
-
-                if (msg == null)
-                    continue;
-
-                ClusterNode node = cctx.discovery().node(msg.senderId());
-
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received message from non-existing node (will ignore): " + msg);
-
-                    continue;
-                }
-
-                processMessage(msg, node);
-            }
-        }
-
-        /**
-         * @param msg Message.
-         * @param node Demander.
-         */
-        private void processMessage(DemandMessage msg, ClusterNode node) {
-            assert msg != null;
-            assert node != null;
-
-            GridDhtPartitionDemandMessage d = msg.message();
-
-            GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                d.updateSequence(), cctx.cacheId());
-
-            long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-            boolean ack = false;
-
-            try {
-                for (int part : d.partitions()) {
-                    GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                        // Reply with partition of "-1" to let sender know that
-                        // this node is no longer an owner.
-                        s.missed(part);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested partition is not owned by local node [part=" + part +
-                                ", demander=" + msg.senderId() + ']');
-
-                        continue;
-                    }
-
-                    GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                    try {
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                            cctx.swap().addOffHeapListener(part, swapLsnr);
-                            cctx.swap().addSwapListener(part, swapLsnr);
-                        }
-
-                        boolean partMissing = false;
-
-                        for (GridCacheEntryEx e : loc.entries()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition, so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + msg.senderId() + ']');
-
-                                partMissing = true;
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                    cctx.cacheId());
-                            }
-
-                            GridCacheEntryInfo info = e.info();
-
-                            if (info != null && !info.isNew()) {
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        if (partMissing)
-                            continue;
-
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
-                                cctx.swap().iterator(part);
-
-                            // Iterator may be null if space does not exist.
-                            if (iter != null) {
-                                try {
-                                    boolean prepared = false;
-
-                                    for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
-                                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                            // Demander no longer needs this partition,
-                                            // so we send '-1' partition and move on.
-                                            s.missed(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Demanding node does not need requested partition " +
-                                                    "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                            partMissing = true;
-
-                                            break; // For.
-                                        }
-
-                                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                            ack = true;
-
-                                            if (!reply(node, d, s))
-                                                return;
-
-                                            // Throttle preloading.
-                                            if (preloadThrottle > 0)
-                                                U.sleep(preloadThrottle);
-
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                                d.updateSequence(), cctx.cacheId());
-                                        }
-
-                                        GridCacheSwapEntry swapEntry = e.getValue();
-
-                                        GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                        info.keyBytes(e.getKey());
-                                        info.ttl(swapEntry.ttl());
-                                        info.expireTime(swapEntry.expireTime());
-                                        info.version(swapEntry.version());
-                                        info.value(swapEntry.value());
-
-                                        if (preloadPred == null || preloadPred.apply(info))
-                                            s.addEntry0(part, info, cctx);
-                                        else {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                    "cache entry): " + info);
-
-                                            continue;
-                                        }
-
-                                        // Need to manually prepare cache message.
-                                        if (depEnabled && !prepared) {
-                                            ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                                swapEntry.valueClassLoaderId() != null ?
-                                                    cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                    null;
-
-                                            if (ldr == null)
-                                                continue;
-
-                                            if (ldr instanceof GridDeploymentInfo) {
-                                                s.prepare((GridDeploymentInfo)ldr);
-
-                                                prepared = true;
-                                            }
-                                        }
-                                    }
-
-                                    if (partMissing)
-                                        continue;
-                                }
-                                finally {
-                                    iter.close();
-                                }
-                            }
-                        }
-
-                        // Stop receiving promote notifications.
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-
-                        if (swapLsnr != null) {
-                            Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                            swapLsnr = null;
-
-                            for (GridCacheEntryInfo info : entries) {
-                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                    // Demander no longer needs this partition,
-                                    // so we send '-1' partition and move on.
-                                    s.missed(part);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Demanding node does not need requested partition " +
-                                            "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                    // No need to continue iteration over swap entries.
-                                    break;
-                                }
-
-                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                    ack = true;
-
-                                    if (!reply(node, d, s))
-                                        return;
-
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                        d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        // Mark as last supply message.
-                        s.last(part);
-
-                        if (ack) {
-                            s.markAck();
-
-                            break; // Partition for loop.
-                        }
-                    }
-                    finally {
-                        loc.release();
-
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-                    }
-                }
-
-                reply(node, d, s);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
-            }
-        }
-
-        /**
-         * @param n Node.
-         * @param d Demand message.
-         * @param s Supply message.
-         * @return {@code True} if message was sent, {@code false} if recipient left grid.
-         * @throws IgniteCheckedException If failed.
-         */
-        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-            throws IgniteCheckedException {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-                cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-                return true;
-            }
-            catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Demand message wrapper.
-     */
-    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param sndId Sender ID.
-         * @param msg Message.
-         */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
-            super(sndId, msg);
-        }
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public DemandMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return get1();
-        }
-
-        /**
-         * @return Message.
-         */
-        public GridDhtPartitionDemandMessage message() {
-            return get2();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
-        }
-    }
-}
\ No newline at end of file


[3/5] ignite git commit: Ignite-1093

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..6479542
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridLeanSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+    @GridToStringInclude
+    private final GridFutureAdapter syncFut = new GridFutureAdapter();
+
+    /** Rebalance future. */
+    @GridToStringInclude
+    private volatile RebalanceFuture rebalanceFut;
+
+    /** Last timeout object. */
+    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+    /** Last exchange future. */
+    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+    /** Demand lock. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final ReadWriteLock demandLock;
+
+    /**
+     * @param cctx Cctx.
+     * @param demandLock Demand lock.
+     */
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.demandLock = demandLock;
+
+        log = cctx.logger(getClass());
+
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+        rebalanceFut = new RebalanceFuture();//Dummy.
+
+        if (!enabled) {
+            // Calling onDone() immediately since preloading is disabled.
+            rebalanceFut.onDone(true);
+            syncFut.onDone();
+        }
+    }
+
+    /**
+     * Start.
+     */
+    void start() {
+    }
+
+    /**
+     * Stop.
+     */
+    void stop() {
+        lastExchangeFut = null;
+
+        lastTimeoutObj.set(null);
+    }
+
+    /**
+     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+     */
+    IgniteInternalFuture<?> syncFuture() {
+        return syncFut;
+    }
+
+    /**
+     * @return Rebalance future.
+     */
+    IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return rebalanceFut;
+    }
+
+    /**
+     * Sets preload predicate for demand pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * Force preload.
+     */
+    void forcePreload() {
+        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+        if (obj != null)
+            cctx.time().removeTimeoutObject(obj);
+
+        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+        if (exchFut != null) {
+            if (log.isDebugEnabled())
+                log.debug("Forcing rebalance event for future: " + exchFut);
+
+            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.shared().exchange().forcePreloadExchange(exchFut);
+                }
+            });
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Ignoring force rebalance request (no topology event happened yet).");
+    }
+
+    /**
+     * @param fut Future.
+     * @return {@code True} if topology changed.
+     */
+    private boolean topologyChanged(RebalanceFuture fut) {
+        return
+            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+                fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
+    }
+
+    /**
+     * @param part Partition.
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        assert discoEvt != null;
+
+        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param fut Future.
+     */
+    private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']');
+
+        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
+
+        if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
+            if (!wFut.get()) {
+                U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+                    "] (cache rebalanced with missed partitions)");
+
+                return false;
+            }
+
+            return true;
+        }
+        else {
+            U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+                "] (topology already changed)");
+
+            return false;
+        }
+    }
+
+    /**
+     * @param assigns Assignments.
+     * @param force {@code True} if dummy reassign.
+     * @param caches Rebalancing of these caches will be finished before this started.
+     * @param cnt Counter.
+     * @throws IgniteCheckedException Exception
+     */
+    Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force,
+        final Collection<String> caches, int cnt) {
+        if (log.isDebugEnabled())
+            log.debug("Adding partition assignments: " + assigns);
+
+        long delay = cctx.config().getRebalanceDelay();
+
+        if (delay == 0 || force) {
+            assert assigns != null;
+
+            final RebalanceFuture oldFut = rebalanceFut;
+
+            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt);
+
+            if (!oldFut.isInitial())
+                oldFut.cancel();
+            else
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> future) {
+                        oldFut.onDone(fut.result());
+                    }
+                });
+
+            rebalanceFut = fut;
+
+            if (assigns.isEmpty()) {
+                fut.doneIfEmpty();
+
+                return null;
+            }
+
+            return new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    for (String c : caches) {
+                        if (!waitForCacheRebalancing(c, fut))
+                            return false;
+                    }
+
+                    return requestPartitions(fut, assigns);
+                }
+            };
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
+
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
+
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+            assert exchFut != null : "Delaying rebalance process without topology event.";
+
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
+
+            lastTimeoutObj.set(obj);
+
+            cctx.time().addTimeoutObject(obj);
+        }
+
+        return null;
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private boolean requestPartitions(RebalanceFuture fut,
+        GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            if (topologyChanged(fut))
+                return false;
+
+            final ClusterNode node = e.getKey();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+        }
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            final ClusterNode node = e.getKey();
+
+            final CacheConfiguration cfg = cctx.config();
+
+            final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            //Check remote node rebalancing API version.
+            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
+                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+                int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
+
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
+
+                Iterator<Integer> it = parts.iterator();
+
+                int cnt = 0;
+
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
+                    if (!sParts.get(cnt).isEmpty()) {
+
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                        initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+                        initD.updateSequence(fut.updateSeq);
+                        initD.timeout(cctx.config().getRebalanceTimeout());
+
+                        cctx.io().sendOrderedMessage(node,
+                            GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+                                cnt + ", partitions count=" + sParts.get(cnt).size() +
+                                " (" + partitionsList(sParts.get(cnt)) + ")]");
+                    }
+                }
+            }
+            else {
+                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+                d.timeout(cctx.config().getRebalanceTimeout());
+                d.workerId(0);//old api support.
+
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+                dw.run(node, d);
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param c Partitions.
+     * @return String representation of partitions list.
+     */
+    private String partitionsList(Collection<Integer> c) {
+        LinkedList<Integer> s = new LinkedList<>(c);
+
+        Collections.sort(s);
+
+        StringBuilder sb = new StringBuilder();
+
+        int start = -1;
+
+        int prev = -1;
+
+        Iterator<Integer> sit = s.iterator();
+
+        while (sit.hasNext()) {
+            int p = sit.next();
+            if (start == -1) {
+                start = p;
+                prev = p;
+            }
+
+            if (prev < p - 1) {
+                sb.append(start);
+
+                if (start != prev)
+                    sb.append("-").append(prev);
+
+                sb.append(", ");
+
+                start = p;
+            }
+
+            if (!sit.hasNext()) {
+                sb.append(start);
+
+                if (start != p)
+                    sb.append("-").append(p);
+            }
+
+            prev = p;
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    public void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessageV2 supply) {
+        AffinityTopologyVersion topVer = supply.topologyVersion();
+
+        final RebalanceFuture fut = rebalanceFut;
+
+        ClusterNode node = cctx.node(id);
+
+        if (node == null)
+            return;
+
+        if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
+            return; // Supple message based on another future.
+
+        if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on).
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
+
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            U.warn(log, "Class got undeployed during preloading: " + supply.classError());
+
+            fut.cancel(id);
+
+            return;
+        }
+
+        final GridDhtPartitionTopology top = cctx.dht().topology();
+
+        try {
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                int p = e.getKey();
+
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                    assert part != null;
+
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
+
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                        part.lock();
+
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
+
+                                    continue;
+                                }
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid partition during " +
+                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+                                    break;
+                                }
+                            }
+
+                            boolean last = supply.last().contains(p);
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);
+
+                                fut.partitionDone(id, p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
+                            }
+                        }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        fut.partitionDone(id, p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                    }
+                }
+                else {
+                    fut.partitionDone(id, p);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
+            }
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : supply.missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    fut.partitionMissed(id, miss);
+
+            for (Integer miss : supply.missed())
+                fut.partitionDone(id, miss);
+
+            GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+            d.timeout(cctx.config().getRebalanceTimeout());
+
+            d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+
+            if (!topologyChanged(fut) && !fut.isDone()) {
+                // Send demand message.
+                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                    d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+        }
+    }
+
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
+
+            try {
+                cached = cctx.dht().entryEx(entry.key());
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
+
+                    if (cached.markObsoleteIfEmpty(null))
+                        cached.context().cache().removeIfObsolete(cached.key());
+
+                    return true;
+                }
+
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
+    }
+
+    /**
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
+     */
+    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
+    }
+
+    /**
+     *
+     */
+    public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 1L;
+
+        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+        private final boolean sendStoppedEvnt;
+
+        /** */
+        private final GridCacheContext<?, ?> cctx;
+
+        /** */
+        private final IgniteLogger log;
+
+        /** Remaining. T2: startTime, partitions */
+        private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
+
+        /** Missed. */
+        private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
+
+        /** Exchange future. */
+        @GridToStringExclude
+        private final GridDhtPartitionsExchangeFuture exchFut;
+
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Unique (per demander) sequence id. */
+        private final long updateSeq;
+
+        /**
+         * @param assigns Assigns.
+         * @param cctx Context.
+         * @param log Logger.
+         * @param sentStopEvnt Stop event flag.
+         */
+        RebalanceFuture(GridDhtPreloaderAssignments assigns,
+            GridCacheContext<?, ?> cctx,
+            IgniteLogger log,
+            boolean sentStopEvnt,
+            long updateSeq) {
+            assert assigns != null;
+
+            this.exchFut = assigns.exchangeFuture();
+            this.topVer = assigns.topologyVersion();
+            this.cctx = cctx;
+            this.log = log;
+            this.sendStoppedEvnt = sentStopEvnt;
+            this.updateSeq = updateSeq;
+        }
+
+        /**
+         * Dummy future. Will be done by real one.
+         */
+        public RebalanceFuture() {
+            this.exchFut = null;
+            this.topVer = null;
+            this.cctx = null;
+            this.log = null;
+            this.sendStoppedEvnt = false;
+            this.updateSeq = -1;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion topologyVersion() {
+            return topVer;
+        }
+
+        /**
+         * @param updateSeq Update sequence.
+         * @return true in case future created for specified updateSeq, false in other case.
+         */
+        private boolean isActual(long updateSeq) {
+            return this.updateSeq == updateSeq;
+        }
+
+        /**
+         * @return Is initial (created at demander creation).
+         */
+        private boolean isInitial() {
+            return topVer == null;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param parts Parts.
+         */
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+            synchronized (this) {
+                remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
+            }
+        }
+
+        /**
+         *
+         */
+        private void doneIfEmpty() {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                assert remaining.isEmpty();
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
+                        ", topology=" + topVer + "]");
+
+                checkIsDone();
+            }
+        }
+
+        /**
+         * Cancels this future.
+         *
+         * @return {@code true}.
+         */
+        @Override public boolean cancel() {
+            synchronized (this) {
+                if (isDone())
+                    return true;
+
+                remaining.clear();
+
+                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion());
+
+                checkIsDone(true /* cancelled */);
+            }
+
+            return true;
+        }
+
+        /**
+         * @param nodeId Node id.
+         */
+        private void cancel(UUID nodeId) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+                remaining.remove(nodeId);
+
+                checkIsDone();
+            }
+
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        private void partitionMissed(UUID nodeId, int p) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                if (missed.get(nodeId) == null)
+                    missed.put(nodeId, new HashSet<Integer>());
+
+                missed.get(nodeId).add(p);
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        private void partitionDone(UUID nodeId, int p) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                        exchFut.discoveryEvent());
+
+                Collection<Integer> parts = remaining.get(nodeId).get2();
+
+                if (parts != null) {
+                    parts.remove(p);
+
+                    if (parts.isEmpty()) {
+                        U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+                            "rebalancing [cache=" + cctx.name() +
+                            ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                            ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+                        remaining.remove(nodeId);
+                    }
+                }
+
+                checkIsDone();
+            }
+        }
+
+        /**
+         * @param part Partition.
+         * @param type Type.
+         * @param discoEvt Discovery event.
+         */
+        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+            assert discoEvt != null;
+
+            cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+        }
+
+        /**
+         * @param type Type.
+         * @param discoEvt Discovery event.
+         */
+        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+            preloadEvent(-1, type, discoEvt);
+        }
+
+        /**
+         *
+         */
+        private void checkIsDone() {
+            checkIsDone(false);
+        }
+
+        /**
+         * @param cancelled Is cancelled.
+         */
+        private void checkIsDone(boolean cancelled) {
+            if (remaining.isEmpty()) {
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+                if (log.isDebugEnabled())
+                    log.debug("Completed rebalance future.");
+
+                cctx.shared().exchange().scheduleResendPartitions();
+
+                Collection<Integer> m = new HashSet<>();
+
+                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                    if (e.getValue() != null && !e.getValue().isEmpty())
+                        m.addAll(e.getValue());
+                }
+
+                if (!m.isEmpty()) {
+                    U.log(log, ("Reassigning partitions that were missed: " + m));
+
+                    onDone(false); //Finished but has missed partitions, will force dummy exchange
+
+                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
+                    return;
+                }
+
+                if (!cancelled && !cctx.preloader().syncFuture().isDone())
+                    ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
+                onDone(true);
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
+    /**
+     *
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private class DemandWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Message queue. */
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
+            new LinkedBlockingDeque<>();
+
+        /** Counter. */
+        private long cntr;
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        private volatile RebalanceFuture fut;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id, RebalanceFuture fut) {
+            assert id >= 0;
+
+            this.id = id;
+            this.fut = fut;
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void addMessage(SupplyMessage msg) {
+            msgQ.offer(msg);
+        }
+
+        /**
+         * @param deque Deque to poll from.
+         * @param time Time to wait.
+         * @return Polled item.
+         * @throws InterruptedException If interrupted.
+         */
+        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+            return deque.poll(time, MILLISECONDS);
+        }
+
+        /**
+         * @param idx Unique index for this topic.
+         * @return Topic for partition.
+         */
+        public Object topic(long idx) {
+            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private void demandFromNode(
+            ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            GridDhtPartitionDemandMessage d,
+            GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            GridDhtPartitionTopology top = cctx.dht().topology();
+
+            cntr++;
+
+            d.topic(topic(cntr));
+            d.workerId(id);
+
+            if (topologyChanged(fut))
+                return;
+
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
+                }
+            });
+
+            try {
+                boolean retry;
+
+                // DoWhile.
+                // =======
+                do {
+                    retry = false;
+
+                    // Create copy.
+                    d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+                    long timeout = cctx.config().getRebalanceTimeout();
+
+                    d.timeout(timeout);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+                    // Send demand message.
+                    cctx.io().send(node, d, cctx.ioPolicy());
+
+                    // While.
+                    // =====
+                    while (!topologyChanged(fut)) {
+                        SupplyMessage s = poll(msgQ, timeout);
+
+                        // If timed out.
+                        if (s == null) {
+                            if (msgQ.isEmpty()) { // Safety check.
+                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+                                    " configuration properties).");
+
+                                // Ordered listener was removed if timeout expired.
+                                cctx.io().removeOrderedHandler(d.topic());
+
+                                // Must create copy to be able to work with IO manager thread local caches.
+                                d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+                                // Create new topic.
+                                d.topic(topic(++cntr));
+
+                                // Create new ordered listener.
+                                cctx.io().addOrderedHandler(d.topic(),
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId,
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
+                                        }
+                                    });
+
+                                // Resend message with larger timeout.
+                                retry = true;
+
+                                break; // While.
+                            }
+                            else
+                                continue; // While.
+                        }
+
+                        // Check that message was received from expected node.
+                        if (!s.senderId().equals(node.id())) {
+                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                            continue; // While.
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received supply message: " + s);
+
+                        GridDhtPartitionSupplyMessage supply = s.supply();
+
+                        // Check whether there were class loading errors on unmarshal
+                        if (supply.classError() != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                            retry = true;
+
+                            // Quit preloading.
+                            break;
+                        }
+
+                        // Preload.
+                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                            int p = e.getKey();
+
+                            if (cctx.affinity().localNode(p, topVer)) {
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                                assert part != null;
+
+                                if (part.state() == MOVING) {
+                                    boolean reserved = part.reserve();
+
+                                    assert reserved : "Failed to reserve partition [gridName=" +
+                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                                    part.lock();
+
+                                    try {
+                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+
+                                        // Loop through all received entries and try to preload them.
+                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                            if (!invalidParts.contains(p)) {
+                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Preloading is not permitted for entry due to " +
+                                                            "evictions [key=" + entry.key() +
+                                                            ", ver=" + entry.version() + ']');
+
+                                                    continue;
+                                                }
+
+                                                if (!preloadEntry(node, p, entry, topVer)) {
+                                                    invalidParts.add(p);
+
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Got entries for invalid partition during " +
+                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+                                                }
+                                            }
+                                        }
+
+                                        boolean last = supply.last().contains(p);
+
+                                        // If message was last for this partition,
+                                        // then we take ownership.
+                                        if (last) {
+                                            fut.partitionDone(node.id(), p);
+
+                                            top.own(part);
+
+                                            if (log.isDebugEnabled())
+                                                log.debug("Finished rebalancing partition: " + part);
+
+                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                                    exchFut.discoveryEvent());
+                                        }
+                                    }
+                                    finally {
+                                        part.unlock();
+                                        part.release();
+                                    }
+                                }
+                                else {
+                                    fut.partitionDone(node.id(), p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                }
+                            }
+                            else {
+                                fut.partitionDone(node.id(), p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                            }
+                        }
+
+                        // Only request partitions based on latest topology version.
+                        for (Integer miss : s.supply().missed()) {
+                            if (cctx.affinity().localNode(miss, topVer))
+                                fut.partitionMissed(node.id(), miss);
+                        }
+
+                        if (fut.remaining.get(node.id()) == null)
+                            break; // While.
+
+                        if (s.supply().ack()) {
+                            retry = true;
+
+                            break;
+                        }
+                    }
+                }
+                while (retry && !topologyChanged(fut));
+            }
+            finally {
+                cctx.io().removeOrderedHandler(d.topic());
+            }
+        }
+
+        /**
+         * @param node Node.
+         * @param d D.
+         */
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
+            demandLock.readLock().lock();
+
+            try {
+                GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
+
+                AffinityTopologyVersion topVer = fut.topVer;
+
+                try {
+                    demandFromNode(node, topVer, d, exchFut);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+        }
+    }
+}