You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/09 11:15:29 UTC

[35/50] [abbrv] ignite git commit: Ignite-1093 "Rebalancing with default parameters is very slow" fixes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..865bad8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.IgniteSpiException;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. T2: nodeId, idx, topVer. */
+    private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
+
+    /**
+     * @param cctx Cache context.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     *
+     */
+    void start() {
+        startOldListeners();
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        synchronized (scMap) {
+            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+            while (it.hasNext()) {
+                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+                clearContext(scMap.get(t), log);
+
+                it.remove();
+            }
+        }
+
+        stopOldListeners();
+    }
+
+    /**
+     * Clear context.
+     *
+     * @param sc Supply context.
+     * @param log Logger.
+     * @return true in case context was removed.
+     */
+    private static void clearContext(
+        final SupplyContext sc,
+        final IgniteLogger log) {
+        if (sc != null) {
+            final Iterator it = sc.entryIt;
+
+            if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
+                try {
+                    ((GridCloseableIterator)it).close();
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Iterator close failed.", e);
+                }
+            }
+
+            final GridDhtLocalPartition loc = sc.loc;
+
+            if (loc != null) {
+                assert loc.reservations() > 0;
+
+                loc.release();
+            }
+        }
+    }
+
+    /**
+     * Handles new topology.
+     *
+     * @param topVer Topology version.
+     */
+    public void onTopologyChanged(AffinityTopologyVersion topVer) {
+        synchronized (scMap) {
+            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+            while (it.hasNext()) {
+                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+                if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts.
+                    clearContext(scMap.get(t), log);
+
+                    it.remove();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Supply context removed [node=" + t.get1() + "]");
+                }
+            }
+        }
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * @param d Demand message.
+     * @param idx Index.
+     * @param id Node uuid.
+     */
+    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+        assert d != null;
+        assert id != null;
+
+        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion demTop = d.topologyVersion();
+
+        T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
+
+        if (d.updateSequence() == -1) {//Demand node requested context cleanup.
+            synchronized (scMap) {
+                clearContext(scMap.remove(scId), log);
+
+                return;
+            }
+        }
+
+        if (cutTop.compareTo(demTop) > 0) {
+            if (log.isDebugEnabled())
+                log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
+                    ", from=" + id + ", idx=" + idx + "]");
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop +
+                ", from=" + id + ", idx=" + idx + "]");
+
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(
+            d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        if (node == null)
+            return; //Context will be cleaned at topology change.
+
+        try {
+            SupplyContext sctx;
+
+            synchronized (scMap) {
+                sctx = scMap.remove(scId);
+
+                assert sctx == null || d.updateSequence() == sctx.updateSeq;
+            }
+
+            // Initial demand request should contain partitions list.
+            if (sctx == null && d.partitions() == null)
+                return;
+
+            assert !(sctx != null && d.partitions() != null);
+
+            long bCnt = 0;
+
+            SupplyContextPhase phase = SupplyContextPhase.NEW;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+                        ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
+                        ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+                        ", idx=" + idx + "]");
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc;
+
+                if (sctx != null && sctx.loc != null) {
+                    loc = sctx.loc;
+
+                    assert loc.reservations() > 0;
+                }
+                else {
+                    loc = top.localPartition(part, d.topologyVersion(), false);
+
+                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                        // Reply with partition of "-1" to let sender know that
+                        // this node is no longer an owner.
+                        s.missed(part);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested partition is not owned by local node [part=" + part +
+                                ", demander=" + id + ']');
+
+                        continue;
+                    }
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == SupplyContextPhase.NEW && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == SupplyContextPhase.NEW)
+                        phase = SupplyContextPhase.ONHEAP;
+
+                    if (phase == SupplyContextPhase.ONHEAP) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        entIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
+
+                                    swapLsnr = null;
+                                    loc = null;
+
+                                    reply(node, d, s, scId);
+
+                                    return;
+                                }
+                                else {
+                                    if (!reply(node, d, s, scId))
+                                        return;
+
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == SupplyContextPhase.ONHEAP) {
+                        phase = SupplyContextPhase.SWAP;
+
+                        if (sctx != null) {
+                            sctx = new SupplyContext(
+                                phase,
+                                partIt,
+                                null,
+                                swapLsnr,
+                                part,
+                                loc,
+                                d.updateSequence());
+                        }
+                    }
+
+                    if (phase == SupplyContextPhase.SWAP && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            sctx != null && sctx.entryIt != null ?
+                                (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                                cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            boolean prepared = false;
+
+                            while (iter.hasNext()) {
+                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                    // Demander no longer needs this partition,
+                                    // so we send '-1' partition and move on.
+                                    s.missed(part);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Demanding node does not need requested partition " +
+                                            "[part=" + part + ", nodeId=" + id + ']');
+
+                                    partMissing = true;
+
+                                    break; // For.
+                                }
+
+                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                    if (++bCnt >= maxBatchesCnt) {
+                                        saveSupplyContext(scId,
+                                            phase,
+                                            partIt,
+                                            part,
+                                            iter,
+                                            swapLsnr,
+                                            loc,
+                                            d.topologyVersion(),
+                                            d.updateSequence());
+
+                                        swapLsnr = null;
+                                        loc = null;
+
+                                        reply(node, d, s, scId);
+
+                                        return;
+                                    }
+                                    else {
+                                        if (!reply(node, d, s, scId))
+                                            return;
+
+                                        s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                            cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+                                    }
+                                }
+
+                                Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                GridCacheSwapEntry swapEntry = e.getValue();
+
+                                GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                info.keyBytes(e.getKey());
+                                info.ttl(swapEntry.ttl());
+                                info.expireTime(swapEntry.expireTime());
+                                info.version(swapEntry.version());
+                                info.value(swapEntry.value());
+
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry0(part, info, cctx);
+                                else {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Rebalance predicate evaluated to false (will not send " +
+                                            "cache entry): " + info);
+
+                                    continue;
+                                }
+
+                                // Need to manually prepare cache message.
+                                if (depEnabled && !prepared) {
+                                    ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                        cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                        swapEntry.valueClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                            null;
+
+                                    if (ldr == null)
+                                        continue;
+
+                                    if (ldr instanceof GridDeploymentInfo) {
+                                        s.prepare((GridDeploymentInfo)ldr);
+
+                                        prepared = true;
+                                    }
+                                }
+                            }
+
+                            iter.close();
+
+                            if (partMissing)
+                                continue;
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == SupplyContextPhase.SWAP) {
+                        phase = SupplyContextPhase.EVICTED;
+
+                        if (sctx != null) {
+                            sctx = new SupplyContext(
+                                phase,
+                                partIt,
+                                null,
+                                null,
+                                part,
+                                loc,
+                                d.updateSequence());
+                        }
+                    }
+
+                    if (phase == SupplyContextPhase.EVICTED && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId,
+                                        phase,
+                                        partIt,
+                                        part,
+                                        lsnrIt,
+                                        swapLsnr,
+                                        loc,
+                                        d.topologyVersion(),
+                                        d.updateSequence());
+
+                                    loc = null;
+
+                                    reply(node, d, s, scId);
+
+                                    return;
+                                }
+                                else {
+                                    if (!reply(node, d, s, scId))
+                                        return;
+
+                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    phase = SupplyContextPhase.NEW;
+
+                    sctx = null;
+                }
+                finally {
+                    if (loc != null)
+                        loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            reply(node, d, s, scId);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+                    ", fromNode=" + node.id() +
+                    ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
+                    ", idx=" + idx + "]");
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+        catch (IgniteSpiException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n,
+        GridDhtPartitionDemandMessage d,
+        GridDhtPartitionSupplyMessageV2 s,
+        T3<UUID, Integer, AffinityTopologyVersion> scId)
+        throws IgniteCheckedException {
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            // Throttle preloading.
+            if (cctx.config().getRebalanceThrottle() > 0)
+                U.sleep(cctx.config().getRebalanceThrottle());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            synchronized (scMap) {
+                clearContext(scMap.remove(scId), log);
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T3<UUID, Integer, AffinityTopologyVersion> t,
+        SupplyContextPhase phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr,
+        GridDhtLocalPartition loc,
+        AffinityTopologyVersion topVer,
+        long updateSeq) {
+        synchronized (scMap) {
+            if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+                assert scMap.get(t) == null;
+
+                scMap.put(t,
+                    new SupplyContext(phase,
+                        partIt,
+                        entryIt,
+                        swapLsnr,
+                        part,
+                        loc,
+                        updateSeq));
+            }
+            else if (loc != null) {
+                assert loc.reservations() > 0;
+
+                loc.release();
+            }
+        }
+    }
+
+    /**
+     * Supply context phase.
+     */
+    private enum SupplyContextPhase {
+        NEW,
+        ONHEAP,
+        SWAP,
+        EVICTED
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext {
+        /** Phase. */
+        private final SupplyContextPhase phase;
+
+        /** Partition iterator. */
+        private final Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private final Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private final GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        private final int part;
+
+        /** Local partition. */
+        private final GridDhtLocalPartition loc;
+
+        /** Update seq. */
+        private final long updateSeq;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(SupplyContextPhase phase,
+            Iterator<Integer> partIt,
+            Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr,
+            int part,
+            GridDhtLocalPartition loc,
+            long updateSeq) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+            this.loc = loc;
+            this.updateSeq = updateSeq;
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void startOldListeners() {
+        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processOldDemandMessage(m, id);
+                }
+            });
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void stopOldListeners() {
+        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
+
+            cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class);
+        }
+    }
+
+    /**
+     * @param d D.
+     * @param id Id.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
+
+        ClusterNode node = cctx.node(id);
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        boolean ack = false;
+
+        try {
+            for (int part : d.partitions()) {
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    for (GridCacheEntryEx e : loc.entries()) {
+                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                            // Demander no longer needs this partition, so we send '-1' partition and move on.
+                            s.missed(part);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Demanding node does not need requested partition [part=" + part +
+                                    ", nodeId=" + id + ']');
+
+                            partMissing = true;
+
+                            break;
+                        }
+
+                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            ack = true;
+
+                            if (!replyOld(node, d, s))
+                                return;
+
+                            // Throttle preloading.
+                            if (preloadThrottle > 0)
+                                U.sleep(preloadThrottle);
+
+                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                cctx.cacheId(), cctx.deploymentEnabled());
+                        }
+
+                        GridCacheEntryInfo info = e.info();
+
+                        if (info != null && !info.isNew()) {
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    if (partMissing)
+                        continue;
+
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
+
+                                        if (!replyOld(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                            d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
+                                    }
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        for (GridCacheEntryInfo info : entries) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
+
+                                if (!replyOld(node, d, s))
+                                    return;
+
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                    d.updateSequence(),
+                                    cctx.cacheId(),
+                                    cctx.deploymentEnabled());
+                            }
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
+                    }
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            replyOld(node, d, s);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+
+    /**
+     * Dumps debug information.
+     */
+    public void dumpDebugInfo() {
+        synchronized (scMap) {
+            if (!scMap.isEmpty()) {
+                U.warn(log, "Rebalancing supplier reserved following partitions:");
+
+                for (SupplyContext sc : scMap.values()) {
+                    if (sc.loc != null)
+                        U.warn(log, ">>> " + sc.loc);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..41454f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,380 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Update sequence. */
+    private long updateSeq;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Partitions that have been fully sent. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> last;
+
+    /** Partitions which were not found. */
+    @GridToStringInclude
+    @GridDirectCollection(int.class)
+    private Collection<Integer> missed;
+
+    /** Entries. */
+    @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+    private Map<Integer, CacheEntryInfoCollection> infos;
+
+    /** Message size. */
+    @GridDirectTransient
+    private int msgSize;
+
+    /**
+     * @param updateSeq Update sequence for this node.
+     * @param cacheId Cache ID.
+     * @param addDepInfo Deployment info flag.
+     */
+    GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
+        this.cacheId = cacheId;
+        this.updateSeq = updateSeq;
+        this.topVer = topVer;
+        this.addDepInfo = addDepInfo;    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionSupplyMessageV2() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ignoreClassErrors() {
+        return true;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * @return Topology version for which demand message is sent.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Flag to indicate last message for partition.
+     */
+    Collection<Integer> last() {
+        return last == null ? Collections.<Integer>emptySet() : last;
+    }
+
+    /**
+     * @param p Partition which was fully sent.
+     */
+    void last(int p) {
+        if (last == null)
+            last = new HashSet<>();
+
+        if (last.add(p)) {
+            msgSize += 4;
+
+            // If partition is empty, we need to add it.
+            if (!infos().containsKey(p)) {
+                CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+                infoCol.init();
+
+                infos().put(p, infoCol);
+            }
+        }
+    }
+
+    /**
+     * @param p Missed partition.
+     */
+    void missed(int p) {
+        if (missed == null)
+            missed = new HashSet<>();
+
+        if (missed.add(p))
+            msgSize += 4;
+    }
+
+    /**
+     * @return Missed partitions.
+     */
+    Collection<Integer> missed() {
+        return missed == null ? Collections.<Integer>emptySet() : missed;
+    }
+
+    /**
+     * @return Entries.
+     */
+    Map<Integer, CacheEntryInfoCollection> infos() {
+        if (infos == null)
+            infos = new HashMap<>();
+
+        return infos;
+    }
+
+    /**
+     * @return Message size.
+     */
+    int messageSize() {
+        return msgSize;
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos().get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos().put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+        assert (info.key() != null || info.keyBytes() != null);
+        assert info.value() != null;
+
+        // Need to call this method to initialize info properly.
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos().get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos().put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+        for (CacheEntryInfoCollection col : infos().values()) {
+            List<GridCacheEntryInfo> entries = col.infos();
+
+            for (int i = 0; i < entries.size(); i++)
+                entries.get(i).unmarshal(cacheCtx, ldr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /**
+     * @return Number of entries in message.
+     */
+    public int size() {
+        return infos().size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 114;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 8;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+            "size", size(),
+            "parts", infos().keySet(),
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index 28a73b1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    private GridDhtPartitionTopology top;
-
-    /** */
-    private final Collection<SupplyWorker> workers = new LinkedList<>();
-
-    /** */
-    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
-    /** */
-    private final boolean depEnabled;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        top = cctx.dht().topology();
-
-        if (!cctx.kernalContext().clientNode()) {
-            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-            for (int i = 0; i < poolSize; i++)
-                workers.add(new SupplyWorker());
-
-            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                    processDemandMessage(id, m);
-                }
-            });
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     *
-     */
-    void start() {
-        for (SupplyWorker w : workers)
-            new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(workers);
-        U.join(workers, log);
-
-        top = null;
-    }
-
-    /**
-     * Sets preload predicate for supply pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return cctx.config().getRebalanceThreadPoolSize();
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param d Message.
-     */
-    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
-        if (!enterBusy())
-            return;
-
-        try {
-            if (cctx.rebalanceEnabled()) {
-                if (log.isDebugEnabled())
-                    log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
-                queue.offer(new DemandMessage(nodeId, d));
-            }
-            else
-                U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(2000, MILLISECONDS);
-    }
-
-    /**
-     * Supply work.
-     */
-    private class SupplyWorker extends GridWorker {
-        /** Hide worker logger and use cache logger. */
-        private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
-        /**
-         * Default constructor.
-         */
-        private SupplyWorker() {
-            super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                DemandMessage msg = poll(queue, this);
-
-                if (msg == null)
-                    continue;
-
-                ClusterNode node = cctx.discovery().node(msg.senderId());
-
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received message from non-existing node (will ignore): " + msg);
-
-                    continue;
-                }
-
-                processMessage(msg, node);
-            }
-        }
-
-        /**
-         * @param msg Message.
-         * @param node Demander.
-         */
-        private void processMessage(DemandMessage msg, ClusterNode node) {
-            assert msg != null;
-            assert node != null;
-
-            GridDhtPartitionDemandMessage d = msg.message();
-
-            GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-
-            long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-            boolean ack = false;
-
-            try {
-                for (int part : d.partitions()) {
-                    GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                        // Reply with partition of "-1" to let sender know that
-                        // this node is no longer an owner.
-                        s.missed(part);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested partition is not owned by local node [part=" + part +
-                                ", demander=" + msg.senderId() + ']');
-
-                        continue;
-                    }
-
-                    GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                    try {
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                            cctx.swap().addOffHeapListener(part, swapLsnr);
-                            cctx.swap().addSwapListener(part, swapLsnr);
-                        }
-
-                        boolean partMissing = false;
-
-                        for (GridCacheEntryEx e : loc.entries()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition, so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + msg.senderId() + ']');
-
-                                partMissing = true;
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                    cctx.cacheId(), cctx.deploymentEnabled());
-                            }
-
-                            GridCacheEntryInfo info = e.info();
-
-                            if (info != null && !info.isNew()) {
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        if (partMissing)
-                            continue;
-
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
-                                cctx.swap().iterator(part);
-
-                            // Iterator may be null if space does not exist.
-                            if (iter != null) {
-                                try {
-                                    boolean prepared = false;
-
-                                    for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
-                                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                            // Demander no longer needs this partition,
-                                            // so we send '-1' partition and move on.
-                                            s.missed(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Demanding node does not need requested partition " +
-                                                    "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                            partMissing = true;
-
-                                            break; // For.
-                                        }
-
-                                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                            ack = true;
-
-                                            if (!reply(node, d, s))
-                                                return;
-
-                                            // Throttle preloading.
-                                            if (preloadThrottle > 0)
-                                                U.sleep(preloadThrottle);
-
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                                d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled());
-                                        }
-
-                                        GridCacheSwapEntry swapEntry = e.getValue();
-
-                                        GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                        info.keyBytes(e.getKey());
-                                        info.ttl(swapEntry.ttl());
-                                        info.expireTime(swapEntry.expireTime());
-                                        info.version(swapEntry.version());
-                                        info.value(swapEntry.value());
-
-                                        if (preloadPred == null || preloadPred.apply(info))
-                                            s.addEntry0(part, info, cctx);
-                                        else {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                    "cache entry): " + info);
-
-                                            continue;
-                                        }
-
-                                        // Need to manually prepare cache message.
-                                        if (depEnabled && !prepared) {
-                                            ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                                swapEntry.valueClassLoaderId() != null ?
-                                                    cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                    null;
-
-                                            if (ldr == null)
-                                                continue;
-
-                                            if (ldr instanceof GridDeploymentInfo) {
-                                                s.prepare((GridDeploymentInfo)ldr);
-
-                                                prepared = true;
-                                            }
-                                        }
-                                    }
-
-                                    if (partMissing)
-                                        continue;
-                                }
-                                finally {
-                                    iter.close();
-                                }
-                            }
-                        }
-
-                        // Stop receiving promote notifications.
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-
-                        if (swapLsnr != null) {
-                            Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                            swapLsnr = null;
-
-                            for (GridCacheEntryInfo info : entries) {
-                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                    // Demander no longer needs this partition,
-                                    // so we send '-1' partition and move on.
-                                    s.missed(part);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Demanding node does not need requested partition " +
-                                            "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                    // No need to continue iteration over swap entries.
-                                    break;
-                                }
-
-                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                    ack = true;
-
-                                    if (!reply(node, d, s))
-                                        return;
-
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                        d.updateSequence(),
-                                        cctx.cacheId(), cctx.deploymentEnabled());
-                                }
-
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        // Mark as last supply message.
-                        s.last(part);
-
-                        if (ack) {
-                            s.markAck();
-
-                            break; // Partition for loop.
-                        }
-                    }
-                    finally {
-                        loc.release();
-
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-                    }
-                }
-
-                reply(node, d, s);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
-            }
-        }
-
-        /**
-         * @param n Node.
-         * @param d Demand message.
-         * @param s Supply message.
-         * @return {@code True} if message was sent, {@code false} if recipient left grid.
-         * @throws IgniteCheckedException If failed.
-         */
-        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-            throws IgniteCheckedException {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-                cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-                return true;
-            }
-            catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Demand message wrapper.
-     */
-    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param sndId Sender ID.
-         * @param msg Message.
-         */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
-            super(sndId, msg);
-        }
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public DemandMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return get1();
-        }
-
-        /**
-         * @return Message.
-         */
-        public GridDhtPartitionDemandMessage message() {
-            return get2();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cef38e8..2f2944d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     // Must initialize topology after we get discovery event.
                     initTopology(cacheCtx);
 
+                    cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+
                     cacheCtx.preloader().updateLastExchangeFuture(this);
                 }