You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/27 10:00:41 UTC

[3/5] ignite git commit: Ignite-1093

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..6479542
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridLeanSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+    @GridToStringInclude
+    private final GridFutureAdapter syncFut = new GridFutureAdapter();
+
+    /** Rebalance future. */
+    @GridToStringInclude
+    private volatile RebalanceFuture rebalanceFut;
+
+    /** Last timeout object. */
+    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+    /** Last exchange future. */
+    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+    /** Demand lock. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final ReadWriteLock demandLock;
+
+    /**
+     * @param cctx Cctx.
+     * @param demandLock Demand lock.
+     */
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.demandLock = demandLock;
+
+        log = cctx.logger(getClass());
+
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+        rebalanceFut = new RebalanceFuture();//Dummy.
+
+        if (!enabled) {
+            // Calling onDone() immediately since preloading is disabled.
+            rebalanceFut.onDone(true);
+            syncFut.onDone();
+        }
+    }
+
+    /**
+     * Start.
+     */
+    void start() {
+    }
+
+    /**
+     * Stop.
+     */
+    void stop() {
+        lastExchangeFut = null;
+
+        lastTimeoutObj.set(null);
+    }
+
+    /**
+     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+     */
+    IgniteInternalFuture<?> syncFuture() {
+        return syncFut;
+    }
+
+    /**
+     * @return Rebalance future.
+     */
+    IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return rebalanceFut;
+    }
+
+    /**
+     * Sets preload predicate for demand pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * Force preload.
+     */
+    void forcePreload() {
+        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+        if (obj != null)
+            cctx.time().removeTimeoutObject(obj);
+
+        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+        if (exchFut != null) {
+            if (log.isDebugEnabled())
+                log.debug("Forcing rebalance event for future: " + exchFut);
+
+            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.shared().exchange().forcePreloadExchange(exchFut);
+                }
+            });
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Ignoring force rebalance request (no topology event happened yet).");
+    }
+
+    /**
+     * @param fut Future.
+     * @return {@code True} if topology changed.
+     */
+    private boolean topologyChanged(RebalanceFuture fut) {
+        return
+            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+                fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
+    }
+
+    /**
+     * @param part Partition.
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        assert discoEvt != null;
+
+        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+    }
+
+    /**
+     * @param name Cache name.
+     * @param fut Future.
+     */
+    private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Waiting for " + name + " cache rebalancing [cacheName=" + cctx.name() + ']');
+
+        RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name).preloader().rebalanceFuture();
+
+        if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) {
+            if (!wFut.get()) {
+                U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+                    "] (cache rebalanced with missed partitions)");
+
+                return false;
+            }
+
+            return true;
+        }
+        else {
+            U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() +
+                "] (topology already changed)");
+
+            return false;
+        }
+    }
+
+    /**
+     * @param assigns Assignments.
+     * @param force {@code True} if dummy reassign.
+     * @param caches Rebalancing of these caches will be finished before this started.
+     * @param cnt Counter.
+     * @throws IgniteCheckedException Exception
+     */
+    Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force,
+        final Collection<String> caches, int cnt) {
+        if (log.isDebugEnabled())
+            log.debug("Adding partition assignments: " + assigns);
+
+        long delay = cctx.config().getRebalanceDelay();
+
+        if (delay == 0 || force) {
+            assert assigns != null;
+
+            final RebalanceFuture oldFut = rebalanceFut;
+
+            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt);
+
+            if (!oldFut.isInitial())
+                oldFut.cancel();
+            else
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> future) {
+                        oldFut.onDone(fut.result());
+                    }
+                });
+
+            rebalanceFut = fut;
+
+            if (assigns.isEmpty()) {
+                fut.doneIfEmpty();
+
+                return null;
+            }
+
+            return new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    for (String c : caches) {
+                        if (!waitForCacheRebalancing(c, fut))
+                            return false;
+                    }
+
+                    return requestPartitions(fut, assigns);
+                }
+            };
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
+
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
+
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+            assert exchFut != null : "Delaying rebalance process without topology event.";
+
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
+
+            lastTimeoutObj.set(obj);
+
+            cctx.time().addTimeoutObject(obj);
+        }
+
+        return null;
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private boolean requestPartitions(RebalanceFuture fut,
+        GridDhtPreloaderAssignments assigns) throws IgniteCheckedException {
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            if (topologyChanged(fut))
+                return false;
+
+            final ClusterNode node = e.getKey();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+        }
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            final ClusterNode node = e.getKey();
+
+            final CacheConfiguration cfg = cctx.config();
+
+            final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            //Check remote node rebalancing API version.
+            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
+                U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+                int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
+
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
+
+                Iterator<Integer> it = parts.iterator();
+
+                int cnt = 0;
+
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
+                    if (!sParts.get(cnt).isEmpty()) {
+
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                        initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
+                        initD.updateSequence(fut.updateSeq);
+                        initD.timeout(cctx.config().getRebalanceTimeout());
+
+                        cctx.io().sendOrderedMessage(node,
+                            GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+                                cnt + ", partitions count=" + sParts.get(cnt).size() +
+                                " (" + partitionsList(sParts.get(cnt)) + ")]");
+                    }
+                }
+            }
+            else {
+                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
+
+                d.timeout(cctx.config().getRebalanceTimeout());
+                d.workerId(0);//old api support.
+
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+                dw.run(node, d);
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param c Partitions.
+     * @return String representation of partitions list.
+     */
+    private String partitionsList(Collection<Integer> c) {
+        LinkedList<Integer> s = new LinkedList<>(c);
+
+        Collections.sort(s);
+
+        StringBuilder sb = new StringBuilder();
+
+        int start = -1;
+
+        int prev = -1;
+
+        Iterator<Integer> sit = s.iterator();
+
+        while (sit.hasNext()) {
+            int p = sit.next();
+            if (start == -1) {
+                start = p;
+                prev = p;
+            }
+
+            if (prev < p - 1) {
+                sb.append(start);
+
+                if (start != prev)
+                    sb.append("-").append(prev);
+
+                sb.append(", ");
+
+                start = p;
+            }
+
+            if (!sit.hasNext()) {
+                sb.append(start);
+
+                if (start != p)
+                    sb.append("-").append(p);
+            }
+
+            prev = p;
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    public void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessageV2 supply) {
+        AffinityTopologyVersion topVer = supply.topologyVersion();
+
+        final RebalanceFuture fut = rebalanceFut;
+
+        ClusterNode node = cctx.node(id);
+
+        if (node == null)
+            return;
+
+        if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
+            return; // Supple message based on another future.
+
+        if (topologyChanged(fut)) { // Topology already changed (for the future that supply message based on).
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
+
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            U.warn(log, "Class got undeployed during preloading: " + supply.classError());
+
+            fut.cancel(id);
+
+            return;
+        }
+
+        final GridDhtPartitionTopology top = cctx.dht().topology();
+
+        try {
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                int p = e.getKey();
+
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                    assert part != null;
+
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
+
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                        part.lock();
+
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
+
+                                    continue;
+                                }
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid partition during " +
+                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+                                    break;
+                                }
+                            }
+
+                            boolean last = supply.last().contains(p);
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);
+
+                                fut.partitionDone(id, p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
+                            }
+                        }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        fut.partitionDone(id, p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                    }
+                }
+                else {
+                    fut.partitionDone(id, p);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
+            }
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : supply.missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    fut.partitionMissed(id, miss);
+
+            for (Integer miss : supply.missed())
+                fut.partitionDone(id, miss);
+
+            GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+
+            d.timeout(cctx.config().getRebalanceTimeout());
+
+            d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+
+            if (!topologyChanged(fut) && !fut.isDone()) {
+                // Send demand message.
+                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                    d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+        }
+    }
+
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
+
+            try {
+                cached = cctx.dht().entryEx(entry.key());
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
+
+                    if (cached.markObsoleteIfEmpty(null))
+                        cached.context().cache().removeIfObsolete(cached.key());
+
+                    return true;
+                }
+
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
+    }
+
+    /**
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
+     */
+    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
+    }
+
+    /**
+     *
+     */
+    public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 1L;
+
+        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */
+        private final boolean sendStoppedEvnt;
+
+        /** */
+        private final GridCacheContext<?, ?> cctx;
+
+        /** */
+        private final IgniteLogger log;
+
+        /** Remaining. T2: startTime, partitions */
+        private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
+
+        /** Missed. */
+        private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
+
+        /** Exchange future. */
+        @GridToStringExclude
+        private final GridDhtPartitionsExchangeFuture exchFut;
+
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Unique (per demander) sequence id. */
+        private final long updateSeq;
+
+        /**
+         * @param assigns Assigns.
+         * @param cctx Context.
+         * @param log Logger.
+         * @param sentStopEvnt Stop event flag.
+         */
+        RebalanceFuture(GridDhtPreloaderAssignments assigns,
+            GridCacheContext<?, ?> cctx,
+            IgniteLogger log,
+            boolean sentStopEvnt,
+            long updateSeq) {
+            assert assigns != null;
+
+            this.exchFut = assigns.exchangeFuture();
+            this.topVer = assigns.topologyVersion();
+            this.cctx = cctx;
+            this.log = log;
+            this.sendStoppedEvnt = sentStopEvnt;
+            this.updateSeq = updateSeq;
+        }
+
+        /**
+         * Dummy future. Will be done by real one.
+         */
+        public RebalanceFuture() {
+            this.exchFut = null;
+            this.topVer = null;
+            this.cctx = null;
+            this.log = null;
+            this.sendStoppedEvnt = false;
+            this.updateSeq = -1;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion topologyVersion() {
+            return topVer;
+        }
+
+        /**
+         * @param updateSeq Update sequence.
+         * @return true in case future created for specified updateSeq, false in other case.
+         */
+        private boolean isActual(long updateSeq) {
+            return this.updateSeq == updateSeq;
+        }
+
+        /**
+         * @return Is initial (created at demander creation).
+         */
+        private boolean isInitial() {
+            return topVer == null;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param parts Parts.
+         */
+        private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
+            synchronized (this) {
+                remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
+            }
+        }
+
+        /**
+         *
+         */
+        private void doneIfEmpty() {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                assert remaining.isEmpty();
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing is not required [cache=" + cctx.name() +
+                        ", topology=" + topVer + "]");
+
+                checkIsDone();
+            }
+        }
+
+        /**
+         * Cancels this future.
+         *
+         * @return {@code true}.
+         */
+        @Override public boolean cancel() {
+            synchronized (this) {
+                if (isDone())
+                    return true;
+
+                remaining.clear();
+
+                U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion());
+
+                checkIsDone(true /* cancelled */);
+            }
+
+            return true;
+        }
+
+        /**
+         * @param nodeId Node id.
+         */
+        private void cancel(UUID nodeId) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+                remaining.remove(nodeId);
+
+                checkIsDone();
+            }
+
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        private void partitionMissed(UUID nodeId, int p) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                if (missed.get(nodeId) == null)
+                    missed.put(nodeId, new HashSet<Integer>());
+
+                missed.get(nodeId).add(p);
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        private void partitionDone(UUID nodeId, int p) {
+            synchronized (this) {
+                if (isDone())
+                    return;
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                        exchFut.discoveryEvent());
+
+                Collection<Integer> parts = remaining.get(nodeId).get2();
+
+                if (parts != null) {
+                    parts.remove(p);
+
+                    if (parts.isEmpty()) {
+                        U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+                            "rebalancing [cache=" + cctx.name() +
+                            ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                            ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+
+                        remaining.remove(nodeId);
+                    }
+                }
+
+                checkIsDone();
+            }
+        }
+
+        /**
+         * @param part Partition.
+         * @param type Type.
+         * @param discoEvt Discovery event.
+         */
+        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+            assert discoEvt != null;
+
+            cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+        }
+
+        /**
+         * @param type Type.
+         * @param discoEvt Discovery event.
+         */
+        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+            preloadEvent(-1, type, discoEvt);
+        }
+
+        /**
+         *
+         */
+        private void checkIsDone() {
+            checkIsDone(false);
+        }
+
+        /**
+         * @param cancelled Is cancelled.
+         */
+        private void checkIsDone(boolean cancelled) {
+            if (remaining.isEmpty()) {
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt))
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+                if (log.isDebugEnabled())
+                    log.debug("Completed rebalance future.");
+
+                cctx.shared().exchange().scheduleResendPartitions();
+
+                Collection<Integer> m = new HashSet<>();
+
+                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                    if (e.getValue() != null && !e.getValue().isEmpty())
+                        m.addAll(e.getValue());
+                }
+
+                if (!m.isEmpty()) {
+                    U.log(log, ("Reassigning partitions that were missed: " + m));
+
+                    onDone(false); //Finished but has missed partitions, will force dummy exchange
+
+                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
+
+                    return;
+                }
+
+                if (!cancelled && !cctx.preloader().syncFuture().isDone())
+                    ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+
+                onDone(true);
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
+    /**
+     *
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private class DemandWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Message queue. */
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
+            new LinkedBlockingDeque<>();
+
+        /** Counter. */
+        private long cntr;
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        private volatile RebalanceFuture fut;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id, RebalanceFuture fut) {
+            assert id >= 0;
+
+            this.id = id;
+            this.fut = fut;
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void addMessage(SupplyMessage msg) {
+            msgQ.offer(msg);
+        }
+
+        /**
+         * @param deque Deque to poll from.
+         * @param time Time to wait.
+         * @return Polled item.
+         * @throws InterruptedException If interrupted.
+         */
+        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+            return deque.poll(time, MILLISECONDS);
+        }
+
+        /**
+         * @param idx Unique index for this topic.
+         * @return Topic for partition.
+         */
+        public Object topic(long idx) {
+            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private void demandFromNode(
+            ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            GridDhtPartitionDemandMessage d,
+            GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            GridDhtPartitionTopology top = cctx.dht().topology();
+
+            cntr++;
+
+            d.topic(topic(cntr));
+            d.workerId(id);
+
+            if (topologyChanged(fut))
+                return;
+
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
+                }
+            });
+
+            try {
+                boolean retry;
+
+                // DoWhile.
+                // =======
+                do {
+                    retry = false;
+
+                    // Create copy.
+                    d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+                    long timeout = cctx.config().getRebalanceTimeout();
+
+                    d.timeout(timeout);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+                    // Send demand message.
+                    cctx.io().send(node, d, cctx.ioPolicy());
+
+                    // While.
+                    // =====
+                    while (!topologyChanged(fut)) {
+                        SupplyMessage s = poll(msgQ, timeout);
+
+                        // If timed out.
+                        if (s == null) {
+                            if (msgQ.isEmpty()) { // Safety check.
+                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+                                    " configuration properties).");
+
+                                // Ordered listener was removed if timeout expired.
+                                cctx.io().removeOrderedHandler(d.topic());
+
+                                // Must create copy to be able to work with IO manager thread local caches.
+                                d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
+
+                                // Create new topic.
+                                d.topic(topic(++cntr));
+
+                                // Create new ordered listener.
+                                cctx.io().addOrderedHandler(d.topic(),
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId,
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
+                                        }
+                                    });
+
+                                // Resend message with larger timeout.
+                                retry = true;
+
+                                break; // While.
+                            }
+                            else
+                                continue; // While.
+                        }
+
+                        // Check that message was received from expected node.
+                        if (!s.senderId().equals(node.id())) {
+                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                            continue; // While.
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received supply message: " + s);
+
+                        GridDhtPartitionSupplyMessage supply = s.supply();
+
+                        // Check whether there were class loading errors on unmarshal
+                        if (supply.classError() != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                            retry = true;
+
+                            // Quit preloading.
+                            break;
+                        }
+
+                        // Preload.
+                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                            int p = e.getKey();
+
+                            if (cctx.affinity().localNode(p, topVer)) {
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                                assert part != null;
+
+                                if (part.state() == MOVING) {
+                                    boolean reserved = part.reserve();
+
+                                    assert reserved : "Failed to reserve partition [gridName=" +
+                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                                    part.lock();
+
+                                    try {
+                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+
+                                        // Loop through all received entries and try to preload them.
+                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                            if (!invalidParts.contains(p)) {
+                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Preloading is not permitted for entry due to " +
+                                                            "evictions [key=" + entry.key() +
+                                                            ", ver=" + entry.version() + ']');
+
+                                                    continue;
+                                                }
+
+                                                if (!preloadEntry(node, p, entry, topVer)) {
+                                                    invalidParts.add(p);
+
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Got entries for invalid partition during " +
+                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+                                                }
+                                            }
+                                        }
+
+                                        boolean last = supply.last().contains(p);
+
+                                        // If message was last for this partition,
+                                        // then we take ownership.
+                                        if (last) {
+                                            fut.partitionDone(node.id(), p);
+
+                                            top.own(part);
+
+                                            if (log.isDebugEnabled())
+                                                log.debug("Finished rebalancing partition: " + part);
+
+                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                                    exchFut.discoveryEvent());
+                                        }
+                                    }
+                                    finally {
+                                        part.unlock();
+                                        part.release();
+                                    }
+                                }
+                                else {
+                                    fut.partitionDone(node.id(), p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                }
+                            }
+                            else {
+                                fut.partitionDone(node.id(), p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                            }
+                        }
+
+                        // Only request partitions based on latest topology version.
+                        for (Integer miss : s.supply().missed()) {
+                            if (cctx.affinity().localNode(miss, topVer))
+                                fut.partitionMissed(node.id(), miss);
+                        }
+
+                        if (fut.remaining.get(node.id()) == null)
+                            break; // While.
+
+                        if (s.supply().ack()) {
+                            retry = true;
+
+                            break;
+                        }
+                    }
+                }
+                while (retry && !topologyChanged(fut));
+            }
+            finally {
+                cctx.io().removeOrderedHandler(d.topic());
+            }
+        }
+
+        /**
+         * @param node Node.
+         * @param d D.
+         */
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
+            demandLock.readLock().lock();
+
+            try {
+                GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
+
+                AffinityTopologyVersion topVer = fut.topVer;
+
+                try {
+                    demandFromNode(node, topVer, d, exchFut);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+        }
+    }
+}