You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/31 16:26:53 UTC

[3/6] ignite git commit: Ignite-1093 Improved rebalancing

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..4fe2153
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+    @GridToStringInclude
+    private volatile SyncFuture syncFut;
+
+    /** Last timeout object. */
+    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+    /** Last exchange future. */
+    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+    /** Demand lock. */
+    private final ReadWriteLock demandLock;
+
+    /**
+     * @param cctx Cctx.
+     * @param demandLock Demand lock.
+     */
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.demandLock = demandLock;
+
+        log = cctx.logger(getClass());
+
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+        syncFut = new SyncFuture(null);
+
+        if (!enabled)
+            // Calling onDone() immediately since preloading is disabled.
+            syncFut.onDone();
+    }
+
+    /**
+     *
+     */
+    void start() {
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        lastExchangeFut = null;
+
+        lastTimeoutObj.set(null);
+    }
+
+    /**
+     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+     */
+    IgniteInternalFuture<?> syncFuture() {
+        return syncFut;
+    }
+
+    /**
+     * Sets preload predicate for demand pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * Force preload.
+     */
+    void forcePreload() {
+        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+        if (obj != null)
+            cctx.time().removeTimeoutObject(obj);
+
+        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+        if (exchFut != null) {
+            if (log.isDebugEnabled())
+                log.debug("Forcing rebalance event for future: " + exchFut);
+
+            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.shared().exchange().forcePreloadExchange(exchFut);
+                }
+            });
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Ignoring force rebalance request (no topology event happened yet).");
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if topology changed.
+     */
+    private boolean topologyChanged(AffinityTopologyVersion topVer) {
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+    }
+
+    /**
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+        preloadEvent(-1, type, discoEvt);
+    }
+
+    /**
+     * @param part Partition.
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        assert discoEvt != null;
+
+        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+    }
+
+    /**
+     * @param assigns Assignments.
+     * @param force {@code True} if dummy reassign.
+     * @throws IgniteCheckedException
+     */
+
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Adding partition assignments: " + assigns);
+
+        long delay = cctx.config().getRebalanceDelay();
+
+        if (delay == 0 || force) {
+            assert assigns != null;
+
+            final AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+            SyncFuture fut = syncFut;
+
+            if (fut.isInited()) {
+                if (!fut.isDone())
+                    fut.onCancel();
+
+                fut = new SyncFuture(assigns);
+
+                syncFut = fut;
+            }
+            else
+                fut.init(assigns);
+
+            if (assigns.isEmpty()) {
+                fut.onDone();
+
+                return;
+            }
+
+            if (topologyChanged(topVer)) {
+                fut.onCancel();
+
+                return;
+            }
+
+            final SyncFuture cSF = fut;
+
+            new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
+                @Override public void run() {
+                    if (!CU.isMarshallerCache(cctx.name())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+                        try {
+                            IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
+
+                            if (!topologyChanged(topVer))
+                                fut.get();
+                            else {
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ']');
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
+                            throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+                        }
+                    }
+
+                    int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+                    if (rebalanceOrder > 0) {
+                        IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+                        try {
+                            if (fut != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+                                        ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                                if (!topologyChanged(topVer))
+                                    fut.get();
+                                else {
+                                    cSF.onCancel();
+
+                                    return;
+                                }
+                            }
+                        }
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
+                            throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+                        }
+                    }
+
+                    requestPartitions(cSF);
+                }
+            }).start();
+
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
+
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
+
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+            assert exchFut != null : "Delaying rebalance process without topology event.";
+
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
+
+            lastTimeoutObj.set(obj);
+
+            cctx.time().addTimeoutObject(obj);
+        }
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private void requestPartitions(SyncFuture fut) {
+        final GridDhtPreloaderAssignments assigns = fut.assigns;
+
+        AffinityTopologyVersion topVer = fut.topologyVersion();
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            if (topologyChanged(topVer)) {
+                fut.onCancel();
+
+                return;
+            }
+
+            final ClusterNode node = e.getKey();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            d.timeout(cctx.config().getRebalanceTimeout());
+            d.workerId(0);//old api support.
+
+            final CacheConfiguration cfg = cctx.config();
+
+            final long start = U.currentTimeMillis();
+
+            fut.logStart(node.id(), start);
+
+            U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+            //Check remote node rebalancing API version.
+            if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
+                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+
+                remainings.addAll(d.partitions());
+
+                fut.append(node.id(), remainings);
+
+                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
+
+                Iterator<Integer> it = d.partitions().iterator();
+
+                int cnt = 0;
+
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
+
+                    if (!sParts.get(cnt).isEmpty()) {
+
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+
+                        try {
+                            if (!topologyChanged(topVer))
+                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                            else
+                                fut.onCancel();
+                        }
+                        catch (IgniteCheckedException ex) {
+                            fut.onCancel();
+
+                            U.error(log, "Failed to send partition demand message to node", ex);
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
+                    }
+                }
+            }
+            else {
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+                fut.append(node.id(), d.partitions());
+
+                dw.run(node, d);
+            }
+        }
+    }
+
+    /**
+     * @param c Partitions.
+     * @return String representation of partitions list.
+     */
+    private String partitionsList(Collection<Integer> c) {
+        LinkedList<Integer> s = new LinkedList<>(c);
+
+        Collections.sort(s);
+
+        StringBuilder sb = new StringBuilder();
+
+        int start = -1;
+
+        int prev = -1;
+
+        Iterator<Integer> sit = s.iterator();
+
+        while (sit.hasNext()) {
+            int p = sit.next();
+            if (start == -1) {
+                start = p;
+                prev = p;
+            }
+
+            if (prev < p - 1) {
+                sb.append(start);
+
+                if (start != prev)
+                    sb.append("-").append(prev);
+
+                sb.append(", ");
+
+                start = p;
+            }
+
+            if (!sit.hasNext()) {
+                sb.append(start);
+
+                if (start != p)
+                    sb.append("-").append(p);
+            }
+
+            prev = p;
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    public void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessageV2 supply) {
+        AffinityTopologyVersion topVer = supply.topologyVersion();
+
+        final SyncFuture fut = syncFut;
+
+        if (topologyChanged(topVer)) {
+            fut.onCancel();
+
+            return;
+        }
+
+        ClusterNode node = cctx.node(id);
+
+        assert node != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
+
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            if (log.isDebugEnabled())
+                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+            fut.onCancel(id);
+
+            return;
+        }
+
+        final GridDhtPartitionTopology top = cctx.dht().topology();
+
+        try {
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                if (topologyChanged(topVer)) {
+                    fut.onCancel();
+
+                    return;
+                }
+
+                int p = e.getKey();
+
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                    assert part != null;
+
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
+
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                        part.lock();
+
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
+
+                                    continue;
+                                }
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid partition during " +
+                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+                                    break;
+                                }
+                            }
+
+                            boolean last = supply.last().contains(p);
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);
+
+                                fut.onPartitionDone(id, p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
+                            }
+                        }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        fut.onPartitionDone(id, p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                    }
+                }
+                else {
+                    fut.onPartitionDone(id, p);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
+            }
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : supply.missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    fut.onMissedPartition(id, miss);
+
+            for (Integer miss : supply.missed())
+                fut.onPartitionDone(id, miss);
+
+            if (!fut.isDone()) {
+                GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+
+                if (d != null) {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+
+                    nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
+
+                    if (!topologyChanged(topVer)) {
+                        // Send demand message.
+                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
+                            nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    }
+                    else
+                        fut.onCancel();
+                }
+            }
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+            fut.onCancel();
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
+
+            fut.onCancel(node.id());
+        }
+    }
+
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
+
+            try {
+                cached = cctx.dht().entryEx(entry.key());
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
+
+                    if (cached.markObsoleteIfEmpty(null))
+                        cached.context().cache().removeIfObsolete(cached.key());
+
+                    return true;
+                }
+
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
+    }
+
+    /**
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
+     */
+    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
+    }
+
+    /**
+     *
+     */
+    public class SyncFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 1L;
+
+        /** Remaining. */
+        private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
+
+        /** Missed. */
+        private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
+
+        /** Started. */
+        private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+
+        /** Lock. */
+        private Lock lock = new ReentrantLock();
+
+        /** Listener. */
+        private volatile GridLocalEventListener lsnr;
+
+        /** Assignments. */
+        private volatile GridDhtPreloaderAssignments assigns;
+
+        /** Completed. */
+        private volatile boolean completed = true;
+
+        /**
+         * @param assigns Assigns.
+         */
+        SyncFuture(GridDhtPreloaderAssignments assigns) {
+            this.assigns = assigns;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion topologyVersion() {
+            return assigns != null ? assigns.topologyVersion() : null;
+        }
+
+        /**
+         * @param assigns Assigns.
+         */
+        void init(GridDhtPreloaderAssignments assigns) {
+            final SyncFuture fut = this;
+
+            lsnr = new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    fut.onCancel();
+                }
+            };
+
+            cctx.events().addListener(lsnr, EVT_NODE_FAILED);
+
+            this.assigns = assigns;
+        }
+
+        /**
+         * @return Initialised or not.
+         */
+        boolean isInited() {
+            return assigns != null;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param parts Parts.
+         */
+        void append(UUID nodeId, Collection<Integer> parts) {
+            remaining.put(nodeId, parts);
+
+            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param time Time.
+         */
+        void logStart(UUID nodeId, long time) {
+            started.put(nodeId, time);
+        }
+
+        /**
+         * @param node Node.
+         */
+        GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+            if (isDone())
+                return null;
+
+            return assigns.get(node);
+        }
+
+        /**
+         *
+         */
+        void onCancel() {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                remaining.clear();
+
+                completed = false;
+
+                U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion() +
+                    ", time=" +
+                    (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         */
+        void onCancel(UUID nodeId) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                remaining.remove(nodeId);
+
+                completed = false;
+
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+
+        }
+
+        /**
+         * @return Is completed.
+         */
+        boolean isCompleted() {
+            return completed;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onMissedPartition(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                if (missed.get(nodeId) == null)
+                    missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+
+                missed.get(nodeId).add(p);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onPartitionDone(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                        assigns.exchangeFuture().discoveryEvent());
+
+                Collection<Integer> parts = remaining.get(nodeId);
+
+                if (parts != null) {
+                    parts.remove(p);
+
+                    if (parts.isEmpty()) {
+                        remaining.remove(nodeId);
+
+                        U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+                            ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                            ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+                    }
+                }
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         *
+         */
+        private void checkIsDone() {
+            if (remaining.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Completed sync future.");
+
+                Collection<Integer> m = new HashSet<>();
+
+                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                    if (e.getValue() != null && !e.getValue().isEmpty())
+                        m.addAll(e.getValue());
+                }
+
+                if (!m.isEmpty()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Reassigning partitions that were missed: " + m);
+
+                    cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+                }
+
+                cctx.shared().exchange().scheduleResendPartitions();
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+
+                if (lsnr != null)
+                    cctx.events().removeListener(lsnr);
+
+                onDone(completed);
+
+                missed.clear();
+                remaining.clear();
+                started.clear();
+                assigns.clear();
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
+    /**
+     *
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private class DemandWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Message queue. */
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
+            new LinkedBlockingDeque<>();
+
+        /** Counter. */
+        private long cntr;
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        private volatile SyncFuture fut;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id, SyncFuture fut) {
+            assert id >= 0;
+
+            this.id = id;
+            this.fut = fut;
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void addMessage(SupplyMessage msg) {
+            msgQ.offer(msg);
+        }
+
+        /**
+         * @param deque Deque to poll from.
+         * @param time Time to wait.
+         * @return Polled item.
+         * @throws InterruptedException If interrupted.
+         */
+        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+            return deque.poll(time, MILLISECONDS);
+        }
+
+        /**
+         * @param idx Unique index for this topic.
+         * @return Topic for partition.
+         */
+        public Object topic(long idx) {
+            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @return Missed partitions.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private Set<Integer> demandFromNode(
+            ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            GridDhtPartitionDemandMessage d,
+            GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            GridDhtPartitionTopology top = cctx.dht().topology();
+
+            cntr++;
+
+            d.topic(topic(cntr));
+            d.workerId(id);
+
+            Set<Integer> missed = new HashSet<>();
+
+            // Get the same collection that will be sent in the message.
+            Collection<Integer> remaining = d.partitions();
+
+            if (topologyChanged(topVer))
+                return missed;
+
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
+                }
+            });
+
+            try {
+                boolean retry;
+
+                // DoWhile.
+                // =======
+                do {
+                    retry = false;
+
+                    // Create copy.
+                    d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                    long timeout = cctx.config().getRebalanceTimeout();
+
+                    d.timeout(timeout);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+                    // Send demand message.
+                    cctx.io().send(node, d, cctx.ioPolicy());
+
+                    // While.
+                    // =====
+                    while (!topologyChanged(topVer)) {
+                        SupplyMessage s = poll(msgQ, timeout);
+
+                        // If timed out.
+                        if (s == null) {
+                            if (msgQ.isEmpty()) { // Safety check.
+                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+                                    " configuration properties).");
+
+                                // Ordered listener was removed if timeout expired.
+                                cctx.io().removeOrderedHandler(d.topic());
+
+                                // Must create copy to be able to work with IO manager thread local caches.
+                                d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                                // Create new topic.
+                                d.topic(topic(++cntr));
+
+                                // Create new ordered listener.
+                                cctx.io().addOrderedHandler(d.topic(),
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId,
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
+                                        }
+                                    });
+
+                                // Resend message with larger timeout.
+                                retry = true;
+
+                                break; // While.
+                            }
+                            else
+                                continue; // While.
+                        }
+
+                        // Check that message was received from expected node.
+                        if (!s.senderId().equals(node.id())) {
+                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                            continue; // While.
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received supply message: " + s);
+
+                        GridDhtPartitionSupplyMessage supply = s.supply();
+
+                        // Check whether there were class loading errors on unmarshal
+                        if (supply.classError() != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                            retry = true;
+
+                            // Quit preloading.
+                            break;
+                        }
+
+                        // Preload.
+                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                            int p = e.getKey();
+
+                            if (cctx.affinity().localNode(p, topVer)) {
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                                assert part != null;
+
+                                if (part.state() == MOVING) {
+                                    boolean reserved = part.reserve();
+
+                                    assert reserved : "Failed to reserve partition [gridName=" +
+                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                                    part.lock();
+
+                                    try {
+                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+
+                                        // Loop through all received entries and try to preload them.
+                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                            if (!invalidParts.contains(p)) {
+                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Preloading is not permitted for entry due to " +
+                                                            "evictions [key=" + entry.key() +
+                                                            ", ver=" + entry.version() + ']');
+
+                                                    continue;
+                                                }
+
+                                                if (!preloadEntry(node, p, entry, topVer)) {
+                                                    invalidParts.add(p);
+
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Got entries for invalid partition during " +
+                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+                                                }
+                                            }
+                                        }
+
+                                        boolean last = supply.last().contains(p);
+
+                                        // If message was last for this partition,
+                                        // then we take ownership.
+                                        if (last) {
+                                            remaining.remove(p);
+                                            fut.onPartitionDone(node.id(), p);
+
+                                            top.own(part);
+
+                                            if (log.isDebugEnabled())
+                                                log.debug("Finished rebalancing partition: " + part);
+
+                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                                    exchFut.discoveryEvent());
+                                        }
+                                    }
+                                    finally {
+                                        part.unlock();
+                                        part.release();
+                                    }
+                                }
+                                else {
+                                    remaining.remove(p);
+                                    fut.onPartitionDone(node.id(), p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                }
+                            }
+                            else {
+                                remaining.remove(p);
+                                fut.onPartitionDone(node.id(), p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                            }
+                        }
+
+                        remaining.removeAll(s.supply().missed());
+
+                        // Only request partitions based on latest topology version.
+                        for (Integer miss : s.supply().missed()) {
+                            if (cctx.affinity().localNode(miss, topVer))
+                                missed.add(miss);
+
+                            fut.onMissedPartition(node.id(), miss);
+                        }
+
+                        if (remaining.isEmpty())
+                            break; // While.
+
+                        if (s.supply().ack()) {
+                            retry = true;
+
+                            break;
+                        }
+                    }
+                }
+                while (retry && !topologyChanged(topVer));
+
+                return missed;
+            }
+            finally {
+                cctx.io().removeOrderedHandler(d.topic());
+            }
+        }
+
+        /**
+         * @param node Node.
+         * @param d D.
+         */
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+            demandLock.readLock().lock();
+
+            try {
+                GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+
+                AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+
+                Collection<Integer> missed = new HashSet<>();
+
+                if (topologyChanged(topVer)) {
+                    fut.onCancel();
+
+                    return;
+                }
+
+                try {
+                    Set<Integer> set = demandFromNode(node, topVer, d, exchFut);
+
+                    if (!set.isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+                                set + ']');
+
+                        missed.addAll(set);
+                    }
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                            ", msg=" + e.getMessage() + ']');
+
+                    fut.onCancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+                    fut.onCancel(node.id());
+                }
+                catch (InterruptedException e) {
+                    fut.onCancel();
+                }
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..0686376
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. */
+    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+    /** Done map. */
+    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+    /**
+     * @param cctx Cache context.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     *
+     */
+    void start() {
+        startOldListeners();
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        top = null;
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * @param d Demand message.
+     * @param id Node uuid.
+     */
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+        assert d != null;
+        assert id != null;
+
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+            d.updateSequence(), cctx.cacheId(), d.topologyVersion());
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+        try {
+            if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+                doneMap.remove(scId);
+                scMap.remove(scId);
+            }
+
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (doneMap.get(scId) != null)
+                return;
+
+            long bCnt = 0;
+
+            int phase = 0;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == 0)
+                        phase = 1;
+
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+                                    swapLsnr = null;
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == 1)
+                        phase = 2;
+
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                while (iter.hasNext()) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        if (!reply(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        if (++bCnt >= maxBatchesCnt) {
+                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+                                            swapLsnr = null;
+
+                                            return;
+                                        }
+                                        else {
+                                            s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId(), d.topologyVersion());
+                                        }
+                                    }
+
+                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == 2)
+                        phase = 3;
+
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    phase = 0;
+
+                    sctx = null;
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            reply(node, d, s);
+
+            doneMap.put(scId, true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s)
+        throws IgniteCheckedException {
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2 t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext {
+        /** Phase. */
+        private int phase;
+
+        /** Partition iterator. */
+        private Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        int part;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void startOldListeners() {
+        if (!cctx.kernalContext().clientNode()) {
+            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processOldDemandMessage(m, id);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param d D.
+     * @param id Id.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        ClusterNode node = cctx.node(id);
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        boolean ack = false;
+
+        try {
+            for (int part : d.partitions()) {
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    for (GridCacheEntryEx e : loc.entries()) {
+                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                            // Demander no longer needs this partition, so we send '-1' partition and move on.
+                            s.missed(part);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Demanding node does not need requested partition [part=" + part +
+                                    ", nodeId=" + id + ']');
+
+                            partMissing = true;
+
+                            break;
+                        }
+
+                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            ack = true;
+
+                            if (!replyOld(node, d, s))
+                                return;
+
+                            // Throttle preloading.
+                            if (preloadThrottle > 0)
+                                U.sleep(preloadThrottle);
+
+                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                cctx.cacheId());
+                        }
+
+                        GridCacheEntryInfo info = e.info();
+
+                        if (info != null && !info.isNew()) {
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    if (partMissing)
+                        continue;
+
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
+
+                                        if (!replyOld(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                            d.updateSequence(), cctx.cacheId());
+                                    }
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        for (GridCacheEntryInfo info : entries) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
+
+                                if (!replyOld(node, d, s))
+                                    return;
+
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                    d.updateSequence(),
+                                    cctx.cacheId());
+                            }
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
+                    }
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            replyOld(node, d, s);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+}