You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/31 16:26:51 UTC

[1/6] ignite git commit: Ignite-1093 Improved rebalancing

Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 [created] 0e6866c55


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 5eb7060..a0d5b6f 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -28,6 +28,10 @@ import org.apache.ignite.transactions.*;
 @SuppressWarnings({"UnusedDeclaration", "FieldCanBeLocal"})
 public class IgniteBenchmarkArguments {
     /** */
+    @Parameter(names = {"-cn", "--cacheName"}, description = "Cache name")
+    private String cacheName;
+
+    /** */
     @Parameter(names = {"-nn", "--nodeNumber"}, description = "Node number")
     private int nodes = 1;
 
@@ -265,11 +269,18 @@ public class IgniteBenchmarkArguments {
     }
 
     /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
      * @return Description.
      */
     public String description() {
         return "-nn=" + nodes + "-b=" + backups + "-sm=" + syncMode + "-cl=" + clientOnly + "-nc=" + nearCacheFlag +
-            (orderMode == null ? "" : "-wom=" + orderMode) + "-txc=" + txConcurrency;
+            (orderMode == null ? "" : "-wom=" + orderMode) + "-txc=" + txConcurrency+ "-cn=" + cacheName;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
index 3ecf904..397c7e9 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
@@ -32,7 +32,10 @@ public abstract class IgniteCacheAbstractBenchmark extends IgniteAbstractBenchma
     @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
         super.setUp(cfg);
 
-        cache = cache();
+        if (args.cacheName() != null)
+            cache = ignite().cache(args.cacheName());
+        else
+            cache = cache();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/rabalance/IgniteRebalancePutGetBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/rabalance/IgniteRebalancePutGetBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/rabalance/IgniteRebalancePutGetBenchmark.java
new file mode 100644
index 0000000..bd4657e
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/rabalance/IgniteRebalancePutGetBenchmark.java
@@ -0,0 +1,72 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.rabalance;
+
+import org.apache.ignite.*;
+import org.apache.ignite.yardstick.cache.*;
+import org.yardstickframework.*;
+
+import java.util.*;
+
+import static org.yardstickframework.BenchmarkUtils.*;
+
+/**
+ *
+ */
+public class IgniteRebalancePutGetBenchmark extends IgniteCacheAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        println(cfg, "Populating query data...");
+
+        long start = System.nanoTime();
+
+        try (IgniteDataStreamer<Integer, Integer> dataLdr = ignite().dataStreamer(cache.getName())) {
+            for (int i = 0; i < args.range() && !Thread.currentThread().isInterrupted(); i++) {
+                dataLdr.addData(i, i);
+
+                if (i % 100000 == 0)
+                    println(cfg, "Populated: " + i);
+            }
+
+            dataLdr.flush();
+        }
+
+        println(cfg, "Finished populating query data in " + ((System.nanoTime() - start) / 1_000_000) + " ms.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        int key = nextRandom(args.range());
+
+        Object val = cache.get(key);
+
+        if (val != null)
+            key = nextRandom(args.range());
+
+        cache.put(key, key);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return null;
+    }
+}


[3/6] ignite git commit: Ignite-1093 Improved rebalancing

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..4fe2153
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Thread pool for requesting partitions from other nodes and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+    @GridToStringInclude
+    private volatile SyncFuture syncFut;
+
+    /** Last timeout object. */
+    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+    /** Last exchange future. */
+    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+    /** Demand lock. */
+    private final ReadWriteLock demandLock;
+
+    /**
+     * @param cctx Cctx.
+     * @param demandLock Demand lock.
+     */
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+        this.demandLock = demandLock;
+
+        log = cctx.logger(getClass());
+
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+        syncFut = new SyncFuture(null);
+
+        if (!enabled)
+            // Calling onDone() immediately since preloading is disabled.
+            syncFut.onDone();
+    }
+
+    /**
+     *
+     */
+    void start() {
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        lastExchangeFut = null;
+
+        lastTimeoutObj.set(null);
+    }
+
+    /**
+     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+     */
+    IgniteInternalFuture<?> syncFuture() {
+        return syncFut;
+    }
+
+    /**
+     * Sets preload predicate for demand pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * Force preload.
+     */
+    void forcePreload() {
+        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+        if (obj != null)
+            cctx.time().removeTimeoutObject(obj);
+
+        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+        if (exchFut != null) {
+            if (log.isDebugEnabled())
+                log.debug("Forcing rebalance event for future: " + exchFut);
+
+            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.shared().exchange().forcePreloadExchange(exchFut);
+                }
+            });
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Ignoring force rebalance request (no topology event happened yet).");
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if topology changed.
+     */
+    private boolean topologyChanged(AffinityTopologyVersion topVer) {
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+    }
+
+    /**
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+        preloadEvent(-1, type, discoEvt);
+    }
+
+    /**
+     * @param part Partition.
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        assert discoEvt != null;
+
+        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+    }
+
+    /**
+     * @param assigns Assignments.
+     * @param force {@code True} if dummy reassign.
+     * @throws IgniteCheckedException
+     */
+
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Adding partition assignments: " + assigns);
+
+        long delay = cctx.config().getRebalanceDelay();
+
+        if (delay == 0 || force) {
+            assert assigns != null;
+
+            final AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+            SyncFuture fut = syncFut;
+
+            if (fut.isInited()) {
+                if (!fut.isDone())
+                    fut.onCancel();
+
+                fut = new SyncFuture(assigns);
+
+                syncFut = fut;
+            }
+            else
+                fut.init(assigns);
+
+            if (assigns.isEmpty()) {
+                fut.onDone();
+
+                return;
+            }
+
+            if (topologyChanged(topVer)) {
+                fut.onCancel();
+
+                return;
+            }
+
+            final SyncFuture cSF = fut;
+
+            new IgniteThread(cctx.gridName(), "demand-thread-" + cctx.cache().name(), new Runnable() {
+                @Override public void run() {
+                    if (!CU.isMarshallerCache(cctx.name())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+                        try {
+                            IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
+
+                            if (!topologyChanged(topVer))
+                                fut.get();
+                            else {
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ']');
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
+                            throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+                        }
+                    }
+
+                    int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+                    if (rebalanceOrder > 0) {
+                        IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+                        try {
+                            if (fut != null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+                                        ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                                if (!topologyChanged(topVer))
+                                    fut.get();
+                                else {
+                                    cSF.onCancel();
+
+                                    return;
+                                }
+                            }
+                        }
+                        catch (IgniteInterruptedCheckedException ignored) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+                                    "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+                                cSF.onCancel();
+
+                                return;
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            cSF.onCancel();
+
+                            throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+                        }
+                    }
+
+                    requestPartitions(cSF);
+                }
+            }).start();
+
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
+
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
+
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+            assert exchFut != null : "Delaying rebalance process without topology event.";
+
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
+
+            lastTimeoutObj.set(obj);
+
+            cctx.time().addTimeoutObject(obj);
+        }
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private void requestPartitions(SyncFuture fut) {
+        final GridDhtPreloaderAssignments assigns = fut.assigns;
+
+        AffinityTopologyVersion topVer = fut.topologyVersion();
+
+        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+            if (topologyChanged(topVer)) {
+                fut.onCancel();
+
+                return;
+            }
+
+            final ClusterNode node = e.getKey();
+
+            GridDhtPartitionDemandMessage d = e.getValue();
+
+            d.timeout(cctx.config().getRebalanceTimeout());
+            d.workerId(0);//old api support.
+
+            final CacheConfiguration cfg = cctx.config();
+
+            final long start = U.currentTimeMillis();
+
+            fut.logStart(node.id(), start);
+
+            U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+                ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+            //Check remote node rebalancing API version.
+            if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
+                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+
+                remainings.addAll(d.partitions());
+
+                fut.append(node.id(), remainings);
+
+                int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
+
+                Iterator<Integer> it = d.partitions().iterator();
+
+                int cnt = 0;
+
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
+
+                    if (!sParts.get(cnt).isEmpty()) {
+
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+
+                        try {
+                            if (!topologyChanged(topVer))
+                                cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+                            else
+                                fut.onCancel();
+                        }
+                        catch (IgniteCheckedException ex) {
+                            fut.onCancel();
+
+                            U.error(log, "Failed to send partition demand message to node", ex);
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
+                    }
+                }
+            }
+            else {
+                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+                fut.append(node.id(), d.partitions());
+
+                dw.run(node, d);
+            }
+        }
+    }
+
+    /**
+     * @param c Partitions.
+     * @return String representation of partitions list.
+     */
+    private String partitionsList(Collection<Integer> c) {
+        LinkedList<Integer> s = new LinkedList<>(c);
+
+        Collections.sort(s);
+
+        StringBuilder sb = new StringBuilder();
+
+        int start = -1;
+
+        int prev = -1;
+
+        Iterator<Integer> sit = s.iterator();
+
+        while (sit.hasNext()) {
+            int p = sit.next();
+            if (start == -1) {
+                start = p;
+                prev = p;
+            }
+
+            if (prev < p - 1) {
+                sb.append(start);
+
+                if (start != prev)
+                    sb.append("-").append(prev);
+
+                sb.append(", ");
+
+                start = p;
+            }
+
+            if (!sit.hasNext()) {
+                sb.append(start);
+
+                if (start != p)
+                    sb.append("-").append(p);
+            }
+
+            prev = p;
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    public void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessageV2 supply) {
+        AffinityTopologyVersion topVer = supply.topologyVersion();
+
+        final SyncFuture fut = syncFut;
+
+        if (topologyChanged(topVer)) {
+            fut.onCancel();
+
+            return;
+        }
+
+        ClusterNode node = cctx.node(id);
+
+        assert node != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
+
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            if (log.isDebugEnabled())
+                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+            fut.onCancel(id);
+
+            return;
+        }
+
+        final GridDhtPartitionTopology top = cctx.dht().topology();
+
+        try {
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                if (topologyChanged(topVer)) {
+                    fut.onCancel();
+
+                    return;
+                }
+
+                int p = e.getKey();
+
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                    assert part != null;
+
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
+
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                        part.lock();
+
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
+
+                                    continue;
+                                }
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid partition during " +
+                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+                                    break;
+                                }
+                            }
+
+                            boolean last = supply.last().contains(p);
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);
+
+                                fut.onPartitionDone(id, p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
+                            }
+                        }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        fut.onPartitionDone(id, p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                    }
+                }
+                else {
+                    fut.onPartitionDone(id, p);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
+            }
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : supply.missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    fut.onMissedPartition(id, miss);
+
+            for (Integer miss : supply.missed())
+                fut.onPartitionDone(id, miss);
+
+            if (!fut.isDone()) {
+                GridDhtPartitionDemandMessage d = fut.getDemandMessage(node);
+
+                if (d != null) {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+
+                    nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
+
+                    if (!topologyChanged(topVer)) {
+                        // Send demand message.
+                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
+                            nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    }
+                    else
+                        fut.onCancel();
+                }
+            }
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+            fut.onCancel();
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
+
+            fut.onCancel(node.id());
+        }
+    }
+
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
+
+            try {
+                cached = cctx.dht().entryEx(entry.key());
+
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
+
+                    if (cached.markObsoleteIfEmpty(null))
+                        cached.context().cache().removeIfObsolete(cached.key());
+
+                    return true;
+                }
+
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
+    }
+
+    /**
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
+     */
+    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
+    }
+
+    /**
+     *
+     */
+    public class SyncFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 1L;
+
+        /** Remaining. */
+        private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
+
+        /** Missed. */
+        private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
+
+        /** Started. */
+        private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+
+        /** Lock. */
+        private Lock lock = new ReentrantLock();
+
+        /** Listener. */
+        private volatile GridLocalEventListener lsnr;
+
+        /** Assignments. */
+        private volatile GridDhtPreloaderAssignments assigns;
+
+        /** Completed. */
+        private volatile boolean completed = true;
+
+        /**
+         * @param assigns Assigns.
+         */
+        SyncFuture(GridDhtPreloaderAssignments assigns) {
+            this.assigns = assigns;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        public AffinityTopologyVersion topologyVersion() {
+            return assigns != null ? assigns.topologyVersion() : null;
+        }
+
+        /**
+         * @param assigns Assigns.
+         */
+        void init(GridDhtPreloaderAssignments assigns) {
+            final SyncFuture fut = this;
+
+            lsnr = new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    fut.onCancel();
+                }
+            };
+
+            cctx.events().addListener(lsnr, EVT_NODE_FAILED);
+
+            this.assigns = assigns;
+        }
+
+        /**
+         * @return Initialised or not.
+         */
+        boolean isInited() {
+            return assigns != null;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param parts Parts.
+         */
+        void append(UUID nodeId, Collection<Integer> parts) {
+            remaining.put(nodeId, parts);
+
+            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param time Time.
+         */
+        void logStart(UUID nodeId, long time) {
+            started.put(nodeId, time);
+        }
+
+        /**
+         * @param node Node.
+         */
+        GridDhtPartitionDemandMessage getDemandMessage(ClusterNode node) {
+            if (isDone())
+                return null;
+
+            return assigns.get(node);
+        }
+
+        /**
+         *
+         */
+        void onCancel() {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                remaining.clear();
+
+                completed = false;
+
+                U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
+                    + ", topology=" + topologyVersion() +
+                    ", time=" +
+                    (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         */
+        void onCancel(UUID nodeId) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                remaining.remove(nodeId);
+
+                completed = false;
+
+                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                    ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                    ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+
+        }
+
+        /**
+         * @return Is completed.
+         */
+        boolean isCompleted() {
+            return completed;
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onMissedPartition(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                if (missed.get(nodeId) == null)
+                    missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+
+                missed.get(nodeId).add(p);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * @param nodeId Node id.
+         * @param p P.
+         */
+        void onPartitionDone(UUID nodeId, int p) {
+            lock.lock();
+            try {
+                if (isDone())
+                    return;
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                        assigns.exchangeFuture().discoveryEvent());
+
+                Collection<Integer> parts = remaining.get(nodeId);
+
+                if (parts != null) {
+                    parts.remove(p);
+
+                    if (parts.isEmpty()) {
+                        remaining.remove(nodeId);
+
+                        U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+                            ", from node=" + nodeId + ", topology=" + topologyVersion() +
+                            ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+                    }
+                }
+
+                checkIsDone();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         *
+         */
+        private void checkIsDone() {
+            if (remaining.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Completed sync future.");
+
+                Collection<Integer> m = new HashSet<>();
+
+                for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                    if (e.getValue() != null && !e.getValue().isEmpty())
+                        m.addAll(e.getValue());
+                }
+
+                if (!m.isEmpty()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Reassigning partitions that were missed: " + m);
+
+                    cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+                }
+
+                cctx.shared().exchange().scheduleResendPartitions();
+
+                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                    preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent());
+
+                if (lsnr != null)
+                    cctx.events().removeListener(lsnr);
+
+                onDone(completed);
+
+                missed.clear();
+                remaining.clear();
+                started.clear();
+                assigns.clear();
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
+    /**
+     *
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private class DemandWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Message queue. */
+        private final LinkedBlockingDeque<SupplyMessage> msgQ =
+            new LinkedBlockingDeque<>();
+
+        /** Counter. */
+        private long cntr;
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        private volatile SyncFuture fut;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id, SyncFuture fut) {
+            assert id >= 0;
+
+            this.id = id;
+            this.fut = fut;
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void addMessage(SupplyMessage msg) {
+            msgQ.offer(msg);
+        }
+
+        /**
+         * @param deque Deque to poll from.
+         * @param time Time to wait.
+         * @return Polled item.
+         * @throws InterruptedException If interrupted.
+         */
+        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+            return deque.poll(time, MILLISECONDS);
+        }
+
+        /**
+         * @param idx Unique index for this topic.
+         * @return Topic for partition.
+         */
+        public Object topic(long idx) {
+            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @return Missed partitions.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private Set<Integer> demandFromNode(
+            ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            GridDhtPartitionDemandMessage d,
+            GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            GridDhtPartitionTopology top = cctx.dht().topology();
+
+            cntr++;
+
+            d.topic(topic(cntr));
+            d.workerId(id);
+
+            Set<Integer> missed = new HashSet<>();
+
+            // Get the same collection that will be sent in the message.
+            Collection<Integer> remaining = d.partitions();
+
+            if (topologyChanged(topVer))
+                return missed;
+
+            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                    addMessage(new SupplyMessage(nodeId, msg));
+                }
+            });
+
+            try {
+                boolean retry;
+
+                // DoWhile.
+                // =======
+                do {
+                    retry = false;
+
+                    // Create copy.
+                    d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                    long timeout = cctx.config().getRebalanceTimeout();
+
+                    d.timeout(timeout);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+                    // Send demand message.
+                    cctx.io().send(node, d, cctx.ioPolicy());
+
+                    // While.
+                    // =====
+                    while (!topologyChanged(topVer)) {
+                        SupplyMessage s = poll(msgQ, timeout);
+
+                        // If timed out.
+                        if (s == null) {
+                            if (msgQ.isEmpty()) { // Safety check.
+                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+                                    " configuration properties).");
+
+                                // Ordered listener was removed if timeout expired.
+                                cctx.io().removeOrderedHandler(d.topic());
+
+                                // Must create copy to be able to work with IO manager thread local caches.
+                                d = new GridDhtPartitionDemandMessage(d, remaining);
+
+                                // Create new topic.
+                                d.topic(topic(++cntr));
+
+                                // Create new ordered listener.
+                                cctx.io().addOrderedHandler(d.topic(),
+                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                                        @Override public void apply(UUID nodeId,
+                                            GridDhtPartitionSupplyMessage msg) {
+                                            addMessage(new SupplyMessage(nodeId, msg));
+                                        }
+                                    });
+
+                                // Resend message with larger timeout.
+                                retry = true;
+
+                                break; // While.
+                            }
+                            else
+                                continue; // While.
+                        }
+
+                        // Check that message was received from expected node.
+                        if (!s.senderId().equals(node.id())) {
+                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                            continue; // While.
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received supply message: " + s);
+
+                        GridDhtPartitionSupplyMessage supply = s.supply();
+
+                        // Check whether there were class loading errors on unmarshal
+                        if (supply.classError() != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                            retry = true;
+
+                            // Quit preloading.
+                            break;
+                        }
+
+                        // Preload.
+                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                            int p = e.getKey();
+
+                            if (cctx.affinity().localNode(p, topVer)) {
+                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                                assert part != null;
+
+                                if (part.state() == MOVING) {
+                                    boolean reserved = part.reserve();
+
+                                    assert reserved : "Failed to reserve partition [gridName=" +
+                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                                    part.lock();
+
+                                    try {
+                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+
+                                        // Loop through all received entries and try to preload them.
+                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                            if (!invalidParts.contains(p)) {
+                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Preloading is not permitted for entry due to " +
+                                                            "evictions [key=" + entry.key() +
+                                                            ", ver=" + entry.version() + ']');
+
+                                                    continue;
+                                                }
+
+                                                if (!preloadEntry(node, p, entry, topVer)) {
+                                                    invalidParts.add(p);
+
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Got entries for invalid partition during " +
+                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+                                                }
+                                            }
+                                        }
+
+                                        boolean last = supply.last().contains(p);
+
+                                        // If message was last for this partition,
+                                        // then we take ownership.
+                                        if (last) {
+                                            remaining.remove(p);
+                                            fut.onPartitionDone(node.id(), p);
+
+                                            top.own(part);
+
+                                            if (log.isDebugEnabled())
+                                                log.debug("Finished rebalancing partition: " + part);
+
+                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                                    exchFut.discoveryEvent());
+                                        }
+                                    }
+                                    finally {
+                                        part.unlock();
+                                        part.release();
+                                    }
+                                }
+                                else {
+                                    remaining.remove(p);
+                                    fut.onPartitionDone(node.id(), p);
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                }
+                            }
+                            else {
+                                remaining.remove(p);
+                                fut.onPartitionDone(node.id(), p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                            }
+                        }
+
+                        remaining.removeAll(s.supply().missed());
+
+                        // Only request partitions based on latest topology version.
+                        for (Integer miss : s.supply().missed()) {
+                            if (cctx.affinity().localNode(miss, topVer))
+                                missed.add(miss);
+
+                            fut.onMissedPartition(node.id(), miss);
+                        }
+
+                        if (remaining.isEmpty())
+                            break; // While.
+
+                        if (s.supply().ack()) {
+                            retry = true;
+
+                            break;
+                        }
+                    }
+                }
+                while (retry && !topologyChanged(topVer));
+
+                return missed;
+            }
+            finally {
+                cctx.io().removeOrderedHandler(d.topic());
+            }
+        }
+
+        /**
+         * @param node Node.
+         * @param d D.
+         */
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+            demandLock.readLock().lock();
+
+            try {
+                GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+
+                AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+
+                Collection<Integer> missed = new HashSet<>();
+
+                if (topologyChanged(topVer)) {
+                    fut.onCancel();
+
+                    return;
+                }
+
+                try {
+                    Set<Integer> set = demandFromNode(node, topVer, d, exchFut);
+
+                    if (!set.isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+                                set + ']');
+
+                        missed.addAll(set);
+                    }
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                            ", msg=" + e.getMessage() + ']');
+
+                    fut.onCancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+                    fut.onCancel(node.id());
+                }
+                catch (InterruptedException e) {
+                    fut.onCancel();
+                }
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..0686376
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. */
+    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+    /** Done map. */
+    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+    /**
+     * @param cctx Cache context.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     *
+     */
+    void start() {
+        startOldListeners();
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        top = null;
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * @param d Demand message.
+     * @param id Node uuid.
+     */
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+        assert d != null;
+        assert id != null;
+
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+            d.updateSequence(), cctx.cacheId(), d.topologyVersion());
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+        try {
+            if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+                doneMap.remove(scId);
+                scMap.remove(scId);
+            }
+
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (doneMap.get(scId) != null)
+                return;
+
+            long bCnt = 0;
+
+            int phase = 0;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == 0)
+                        phase = 1;
+
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+                                    swapLsnr = null;
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == 1)
+                        phase = 2;
+
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                while (iter.hasNext()) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        if (!reply(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        if (++bCnt >= maxBatchesCnt) {
+                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+                                            swapLsnr = null;
+
+                                            return;
+                                        }
+                                        else {
+                                            s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId(), d.topologyVersion());
+                                        }
+                                    }
+
+                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == 2)
+                        phase = 3;
+
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    phase = 0;
+
+                    sctx = null;
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            reply(node, d, s);
+
+            doneMap.put(scId, true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s)
+        throws IgniteCheckedException {
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2 t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext {
+        /** Phase. */
+        private int phase;
+
+        /** Partition iterator. */
+        private Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        int part;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+        }
+    }
+
+    @Deprecated//Backward compatibility. To be removed in future.
+    public void startOldListeners() {
+        if (!cctx.kernalContext().clientNode()) {
+            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processOldDemandMessage(m, id);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param d D.
+     * @param id Id.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        ClusterNode node = cctx.node(id);
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        boolean ack = false;
+
+        try {
+            for (int part : d.partitions()) {
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    for (GridCacheEntryEx e : loc.entries()) {
+                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                            // Demander no longer needs this partition, so we send '-1' partition and move on.
+                            s.missed(part);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Demanding node does not need requested partition [part=" + part +
+                                    ", nodeId=" + id + ']');
+
+                            partMissing = true;
+
+                            break;
+                        }
+
+                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            ack = true;
+
+                            if (!replyOld(node, d, s))
+                                return;
+
+                            // Throttle preloading.
+                            if (preloadThrottle > 0)
+                                U.sleep(preloadThrottle);
+
+                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                cctx.cacheId());
+                        }
+
+                        GridCacheEntryInfo info = e.info();
+
+                        if (info != null && !info.isNew()) {
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    if (partMissing)
+                        continue;
+
+                    if (cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
+
+                                        if (!replyOld(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                            d.updateSequence(), cctx.cacheId());
+                                    }
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        for (GridCacheEntryInfo info : entries) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
+
+                                if (!replyOld(node, d, s))
+                                    return;
+
+                                s = new GridDhtPartitionSupplyMessage(d.workerId(),
+                                    d.updateSequence(),
+                                    cctx.cacheId());
+                            }
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
+                    }
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            replyOld(node, d, s);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+}


[2/6] ignite git commit: Ignite-1093 Improved rebalancing

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..01056ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,423 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Worker ID. */
+    private int workerId = -1;
+
+    /** Update sequence. */
+    private long updateSeq;
+
+    /** Acknowledgement flag. */
+    private boolean ack;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Partitions that have been fully sent. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> last;
+
+    /** Partitions which were not found. */
+    @GridToStringInclude
+    @GridDirectCollection(int.class)
+    private Collection<Integer> missed;
+
+    /** Entries. */
+    @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+    private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+    /** Message size. */
+    @GridDirectTransient
+    private int msgSize;
+
+    /**
+     * @param workerId Worker ID.
+     * @param updateSeq Update sequence for this node.
+     * @param cacheId Cache ID.
+     */
+    GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+        assert workerId >= 0;
+        assert updateSeq > 0;
+
+        this.cacheId = cacheId;
+        this.updateSeq = updateSeq;
+        this.workerId = workerId;
+        this.topVer = topVer;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionSupplyMessageV2() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ignoreClassErrors() {
+        return true;
+    }
+
+    /**
+     * @return Worker ID.
+     */
+    int workerId() {
+        return workerId;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * Marks this message for acknowledgment.
+     */
+    void markAck() {
+        ack = true;
+    }
+
+    /**
+     * @return Acknowledgement flag.
+     */
+    boolean ack() {
+        return ack;
+    }
+
+    /**
+     * @return Topology version for which demand message is sent.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Flag to indicate last message for partition.
+     */
+    Collection<Integer> last() {
+        return last == null ? Collections.<Integer>emptySet() : last;
+    }
+
+    /**
+     * @param p Partition which was fully sent.
+     */
+    void last(int p) {
+        if (last == null)
+            last = new HashSet<>();
+
+        if (last.add(p)) {
+            msgSize += 4;
+
+            // If partition is empty, we need to add it.
+            if (!infos.containsKey(p)) {
+                CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+                infoCol.init();
+
+                infos.put(p, infoCol);
+            }
+        }
+    }
+
+    /**
+     * @param p Missed partition.
+     */
+    void missed(int p) {
+        if (missed == null)
+            missed = new HashSet<>();
+
+        if (missed.add(p))
+            msgSize += 4;
+    }
+
+    /**
+     * @return Missed partitions.
+     */
+    Collection<Integer> missed() {
+        return missed == null ? Collections.<Integer>emptySet() : missed;
+    }
+
+    /**
+     * @return Entries.
+     */
+    Map<Integer, CacheEntryInfoCollection> infos() {
+        return infos;
+    }
+
+    /**
+     * @return Message size.
+     */
+    int messageSize() {
+        return msgSize;
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+        assert (info.key() != null || info.keyBytes() != null);
+        assert info.value() != null;
+
+        // Need to call this method to initialize info properly.
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+        for (CacheEntryInfoCollection col : infos().values()) {
+            List<GridCacheEntryInfo>  entries = col.infos();
+
+            for (int i = 0; i < entries.size(); i++)
+                entries.get(i).unmarshal(cacheCtx, ldr);
+        }
+    }
+
+    /**
+     * @return Number of entries in message.
+     */
+    public int size() {
+        return infos.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("ack", ack))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeInt("workerId", workerId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                ack = reader.readBoolean("ack");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                workerId = reader.readInt("workerId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 114;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+            "size", size(),
+            "parts", infos.keySet(),
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index 13cfef3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    private GridDhtPartitionTopology top;
-
-    /** */
-    private final Collection<SupplyWorker> workers = new LinkedList<>();
-
-    /** */
-    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
-    /** */
-    private final boolean depEnabled;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        top = cctx.dht().topology();
-
-        if (!cctx.kernalContext().clientNode()) {
-            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-            for (int i = 0; i < poolSize; i++)
-                workers.add(new SupplyWorker());
-
-            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                    processDemandMessage(id, m);
-                }
-            });
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     *
-     */
-    void start() {
-        for (SupplyWorker w : workers)
-            new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(workers);
-        U.join(workers, log);
-
-        top = null;
-    }
-
-    /**
-     * Sets preload predicate for supply pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return cctx.config().getRebalanceThreadPoolSize();
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param d Message.
-     */
-    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
-        if (!enterBusy())
-            return;
-
-        try {
-            if (cctx.rebalanceEnabled()) {
-                if (log.isDebugEnabled())
-                    log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
-
-                queue.offer(new DemandMessage(nodeId, d));
-            }
-            else
-                U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(2000, MILLISECONDS);
-    }
-
-    /**
-     * Supply work.
-     */
-    private class SupplyWorker extends GridWorker {
-        /** Hide worker logger and use cache logger. */
-        private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
-
-        /**
-         * Default constructor.
-         */
-        private SupplyWorker() {
-            super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                DemandMessage msg = poll(queue, this);
-
-                if (msg == null)
-                    continue;
-
-                ClusterNode node = cctx.discovery().node(msg.senderId());
-
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received message from non-existing node (will ignore): " + msg);
-
-                    continue;
-                }
-
-                processMessage(msg, node);
-            }
-        }
-
-        /**
-         * @param msg Message.
-         * @param node Demander.
-         */
-        private void processMessage(DemandMessage msg, ClusterNode node) {
-            assert msg != null;
-            assert node != null;
-
-            GridDhtPartitionDemandMessage d = msg.message();
-
-            GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                d.updateSequence(), cctx.cacheId());
-
-            long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-            boolean ack = false;
-
-            try {
-                for (int part : d.partitions()) {
-                    GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                        // Reply with partition of "-1" to let sender know that
-                        // this node is no longer an owner.
-                        s.missed(part);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested partition is not owned by local node [part=" + part +
-                                ", demander=" + msg.senderId() + ']');
-
-                        continue;
-                    }
-
-                    GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                    try {
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                            cctx.swap().addOffHeapListener(part, swapLsnr);
-                            cctx.swap().addSwapListener(part, swapLsnr);
-                        }
-
-                        boolean partMissing = false;
-
-                        for (GridCacheEntryEx e : loc.entries()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition, so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + msg.senderId() + ']');
-
-                                partMissing = true;
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                    cctx.cacheId());
-                            }
-
-                            GridCacheEntryInfo info = e.info();
-
-                            if (info != null && !info.isNew()) {
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        if (partMissing)
-                            continue;
-
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
-                                cctx.swap().iterator(part);
-
-                            // Iterator may be null if space does not exist.
-                            if (iter != null) {
-                                try {
-                                    boolean prepared = false;
-
-                                    for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
-                                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                            // Demander no longer needs this partition,
-                                            // so we send '-1' partition and move on.
-                                            s.missed(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Demanding node does not need requested partition " +
-                                                    "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                            partMissing = true;
-
-                                            break; // For.
-                                        }
-
-                                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                            ack = true;
-
-                                            if (!reply(node, d, s))
-                                                return;
-
-                                            // Throttle preloading.
-                                            if (preloadThrottle > 0)
-                                                U.sleep(preloadThrottle);
-
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                                d.updateSequence(), cctx.cacheId());
-                                        }
-
-                                        GridCacheSwapEntry swapEntry = e.getValue();
-
-                                        GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                        info.keyBytes(e.getKey());
-                                        info.ttl(swapEntry.ttl());
-                                        info.expireTime(swapEntry.expireTime());
-                                        info.version(swapEntry.version());
-                                        info.value(swapEntry.value());
-
-                                        if (preloadPred == null || preloadPred.apply(info))
-                                            s.addEntry0(part, info, cctx);
-                                        else {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                    "cache entry): " + info);
-
-                                            continue;
-                                        }
-
-                                        // Need to manually prepare cache message.
-                                        if (depEnabled && !prepared) {
-                                            ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                                swapEntry.valueClassLoaderId() != null ?
-                                                    cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                    null;
-
-                                            if (ldr == null)
-                                                continue;
-
-                                            if (ldr instanceof GridDeploymentInfo) {
-                                                s.prepare((GridDeploymentInfo)ldr);
-
-                                                prepared = true;
-                                            }
-                                        }
-                                    }
-
-                                    if (partMissing)
-                                        continue;
-                                }
-                                finally {
-                                    iter.close();
-                                }
-                            }
-                        }
-
-                        // Stop receiving promote notifications.
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-
-                        if (swapLsnr != null) {
-                            Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                            swapLsnr = null;
-
-                            for (GridCacheEntryInfo info : entries) {
-                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                    // Demander no longer needs this partition,
-                                    // so we send '-1' partition and move on.
-                                    s.missed(part);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Demanding node does not need requested partition " +
-                                            "[part=" + part + ", nodeId=" + msg.senderId() + ']');
-
-                                    // No need to continue iteration over swap entries.
-                                    break;
-                                }
-
-                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                    ack = true;
-
-                                    if (!reply(node, d, s))
-                                        return;
-
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                        d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        // Mark as last supply message.
-                        s.last(part);
-
-                        if (ack) {
-                            s.markAck();
-
-                            break; // Partition for loop.
-                        }
-                    }
-                    finally {
-                        loc.release();
-
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
-                    }
-                }
-
-                reply(node, d, s);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
-            }
-        }
-
-        /**
-         * @param n Node.
-         * @param d Demand message.
-         * @param s Supply message.
-         * @return {@code True} if message was sent, {@code false} if recipient left grid.
-         * @throws IgniteCheckedException If failed.
-         */
-        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-            throws IgniteCheckedException {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-                cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-                return true;
-            }
-            catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-                return false;
-            }
-        }
-    }
-
-    /**
-     * Demand message wrapper.
-     */
-    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param sndId Sender ID.
-         * @param msg Message.
-         */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
-            super(sndId, msg);
-        }
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public DemandMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return get1();
-        }
-
-        /**
-         * @return Message.
-         */
-        public GridDhtPartitionDemandMessage message() {
-            return get2();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a43ebe2..7a9deba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -42,6 +41,7 @@ import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
 
 /**
@@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool supplyPool;
+    private GridDhtPartitionSupplier supplier;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool demandPool;
+    private GridDhtPartitionDemander demander;
 
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
@@ -76,6 +76,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -159,8 +162,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx);
+        demander = new GridDhtPartitionDemander(cctx, demandLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -180,18 +183,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         topVer.setIfGreater(startTopVer);
 
-        supplyPool.start();
-        demandPool.start();
+        supplier.start();
+        demander.start();
     }
 
     /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         super.preloadPredicate(preloadPred);
 
-        assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+        assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
 
-        supplyPool.preloadPredicate(preloadPred);
-        demandPool.preloadPredicate(preloadPred);
+        supplier.preloadPredicate(preloadPred);
+        demander.preloadPredicate(preloadPred);
     }
 
     /** {@inheritDoc} */
@@ -205,62 +208,165 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
-        if (supplyPool != null)
-            supplyPool.stop();
+        if (supplier != null)
+            supplier.stop();
 
-        if (demandPool != null)
-            demandPool.stop();
+        if (demander != null)
+            demander.stop();
 
         top = null;
     }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
-        if (err == null) {
+        if (err == null)
             startFut.onDone();
+        else
+            startFut.onDone(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReconnected() {
+        startFut = new GridFutureAdapter<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        demander.updateLastExchangeFuture(lastFut);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
-            final long start = U.currentTimeMillis();
+        int partCnt = cctx.affinity().partitions();
 
-            final CacheConfiguration cfg = cctx.config();
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
 
-            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
-                demandPool.syncFuture().listen(new CI1<Object>() {
-                    @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
-                            "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
+
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                assert part != null;
+                assert part.id() == p;
+
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
                     }
-                });
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.first(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
             }
         }
-        else
-            startFut.onDone(err);
+
+        return assigns;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onReconnected() {
-        startFut = new GridFutureAdapter<>();
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onExchangeFutureAdded() {
-        demandPool.onExchangeFutureAdded();
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
     }
 
     /** {@inheritDoc} */
-    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demandPool.updateLastExchangeFuture(lastFut);
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+        demandLock.readLock().lock();
+        try {
+            demander.handleSupplyMessage(idx, id, s);
+        }
+        finally {
+            demandLock.readLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demandPool.assign(exchFut);
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d){
+        supplier.handleDemandMessage(id, d);
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
-        demandPool.addAssignments(assignments, forcePreload);
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
+        demander.addAssignments(assignments, forcePreload);
     }
 
     /**
@@ -272,7 +378,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
     }
 
     /**
@@ -531,12 +637,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void forcePreload() {
-        demandPool.forcePreload();
+        demander.forcePreload();
     }
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demandPool.unwindUndeploys();
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index e683dad..77b55a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1906,7 +1906,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * <p>
      * This method is intended for test purposes only.
      */
-    void simulateNodeFailure() {
+    protected void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
index a37f585..88540db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingRebalanceErrorTest.java
@@ -53,7 +53,7 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2
 
         readCnt.set(Integer.MAX_VALUE);
 
-        for (int i = 0; i <= 1000; i++)
+        for (int i = 0; i <= 100000; i++)
             jcache(0).put(new TestKey(String.valueOf(++key)), "");
 
         startGrid(3);
@@ -68,6 +68,7 @@ public class IgniteCacheP2pUnmarshallingRebalanceErrorTest extends IgniteCacheP2
         try {
             jcache(3).get(new TestKey(String.valueOf(key)));
 
+            //Can fail in case rebalancing finished before get
             assert false : "p2p marshalling failed, but error response was not sent";
         }
         catch (CacheException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..a17fc7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        cacheCfg = iCfg.getCacheConfiguration()[1];
+
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(20).equals(gridName))
+            spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
+
+        return iCfg;
+    }
+
+    public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
+        public void fail() {
+            simulateNodeFailure();
+        }
+    }
+
+    private volatile FailableTcpDiscoverySpi spi;
+
+    /**
+     * @throws Exception
+     */
+    public void testNodeFailedAtRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        startGrid(1);
+
+        waitForRebalancing(1, 2);
+
+        startGrid(20);
+
+        waitForRebalancing(20, 3);
+
+        spi.fail();
+
+        waitForRebalancing(0, 4);
+        waitForRebalancing(1, 4);
+
+        stopAllGrids();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..bd1bf28
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,270 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    private static int TEST_SIZE = 1_000_000;
+
+    /** cache name. */
+    protected static String CACHE_NAME_DHT = "cache";
+
+    /** cache 2 name. */
+    protected static String CACHE_2_NAME_DHT = "cache2";
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(10).equals(gridName))
+            iCfg.setClientMode(true);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+        cacheCfg.setName(CACHE_NAME_DHT);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg.setRebalanceBatchSize(1024);
+        //cacheCfg.setRebalanceBatchesCount(1);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setBackups(1);
+
+        CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+        cacheCfg2.setName(CACHE_2_NAME_DHT);
+        cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg2.setRebalanceBatchSize(1024);
+        //cacheCfg2.setRebalanceBatchesCount(1);
+        cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg2.setBackups(1);
+
+        iCfg.setRebalanceThreadPoolSize(4);
+        iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
+        return iCfg;
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite) {
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i);
+            }
+
+            stmr.flush();
+        }
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i + 3);
+            }
+
+            stmr.flush();
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException
+     */
+    protected void checkData(Ignite ignite) throws IgniteCheckedException {
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+                "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+        }
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+                "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testSimpleRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+
+        waitForRebalancing(1, 2);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        stopGrid(0);
+
+        checkData(grid(1));
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param id Id.
+     * @param top Topology.
+     */
+    protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+        boolean finished = false;
+
+        while (!finished) {
+            finished = true;
+
+            for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+                GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
+                if (fut.topologyVersion().topologyVersion() != top) {
+                    finished = false;
+
+                    break;
+                }
+                else
+                    fut.get();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testComplexRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        //will be started simultaneously in case of ASYNC mode
+        startGrid(1);
+        startGrid(2);
+        startGrid(3);
+        startGrid(4);
+
+        //wait until cache rebalanced in async mode
+        waitForRebalancing(1, 5);
+        waitForRebalancing(2, 5);
+        waitForRebalancing(3, 5);
+        waitForRebalancing(4, 5);
+
+        //cache rebalanced in async node
+
+        stopGrid(0);
+
+        //wait until cache rebalanced
+        waitForRebalancing(1, 6);
+        waitForRebalancing(2, 6);
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
+
+        //cache rebalanced
+
+        stopGrid(1);
+
+        //wait until cache rebalanced
+        waitForRebalancing(2, 7);
+        waitForRebalancing(3, 7);
+        waitForRebalancing(4, 7);
+
+        //cache rebalanced
+
+        stopGrid(2);
+
+        //wait until cache rebalanced
+        waitForRebalancing(3, 8);
+        waitForRebalancing(4, 8);
+
+        //cache rebalanced
+
+        stopGrid(3);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(4));
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testBackwardCompatibility() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        Map<String, Object> map = new HashMap<>(ignite.cluster().localNode().attributes());
+
+        map.put(IgniteNodeAttributes.REBALANCING_VERSION, 0);
+
+        ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
+
+        generateData(ignite);
+
+        startGrid(1);
+
+        waitForRebalancing(1, 2);
+
+        stopGrid(0);
+
+        checkData(grid(1));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 0f4d24d..641fc5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -126,26 +126,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testSingleZeroPoolSize() throws Exception {
-        preloadMode = SYNC;
-        poolSize = 0;
-
-        try {
-            startGrid(1);
-
-            assert false : "Grid should have been failed to start.";
-        }
-        catch (IgniteCheckedException e) {
-            info("Caught expected exception: " + e);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
      * @throws Exception If test failed.
      */
     public void testIntegrity() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index c20e901..c787f21 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.preloader.*;
 import org.apache.ignite.internal.processors.cache.local.*;
@@ -79,6 +80,9 @@ public class IgniteCacheTestSuite3 extends TestSuite {
         suite.addTestSuite(GridCacheReplicatedPreloadStartStopEventsSelfTest.class);
         suite.addTestSuite(GridReplicatedTxPreloadTest.class);
 
+        suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
+
         suite.addTestSuite(IgniteTxReentryNearSelfTest.class);
         suite.addTestSuite(IgniteTxReentryColocatedSelfTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/benchmark-rebalancing-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing-win.properties b/modules/yardstick/config/benchmark-rebalancing-win.properties
new file mode 100644
index 0000000..978e388
--- /dev/null
+++ b/modules/yardstick/config/benchmark-rebalancing-win.properties
@@ -0,0 +1,60 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+::
+:: Contains benchmarks for ATOMIC cache.
+::
+
+:: JVM options.
+set JVM_OPTS=%JVM_OPTS% -DIGNITE_QUIET=false
+
+:: Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
+:: set JVM_OPTS=%JVM_OPTS%^
+::  -XX:+UseParNewGC^
+::  -XX:+UseConcMarkSweepGC^
+::  -XX:+UseTLAB^
+::  -XX:NewSize=128m^
+::  -XX:MaxNewSize=128m^
+::  -XX:MaxTenuringThreshold=0^
+::  -XX:SurvivorRatio=1024^
+::  -XX:+UseCMSInitiatingOccupancyOnly^
+::  -XX:CMSInitiatingOccupancyFraction=60
+
+:: List of default probes.
+BENCHMARK_DEFAULT_PROBES=ThroughputLatencyProbe,PercentileProbe
+
+:: Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+:: Probe point writer class name.
+:: BENCHMARK_WRITER=
+
+:: Comma-separated list of the hosts to run BenchmarkServers on. 2 nodes on local host are enabled by default.
+set SERVER_HOSTS=localhost
+
+:: Comma-separated list of the hosts to run BenchmarkDrivers on. 1 node on local host is enabled by default.
+set DRIVER_HOSTS=localhost
+
+:: Remote username.
+:: set REMOTE_USER=
+:: set RESTART_SERVERS=localhost:10:100
+
+:: Run configuration which contains all benchmarks.
+:: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
+
+set CONFIGS=^
+-cfg %SCRIPT_DIR%\..\config\ignite-rebalancing-multicast-win-config.xml -b 1 -w 0 -d 200 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 5000000 -cn rebalance2

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/benchmark-rebalancing.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing.properties b/modules/yardstick/config/benchmark-rebalancing.properties
new file mode 100644
index 0000000..f6d6967
--- /dev/null
+++ b/modules/yardstick/config/benchmark-rebalancing.properties
@@ -0,0 +1,79 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+#
+# Contains benchmarks for ATOMIC cache.
+#
+
+now0=`date +'%H%M%S'`
+
+# JVM options.
+JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false -Xms15g -Xmx15g"
+
+# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses.
+JVM_OPTS=${JVM_OPTS}" \
+  -Xloggc:./gc${now0}.log \
+  -XX:+PrintGCDetails \
+  -verbose:gc \
+  -XX:+UseParNewGC \
+  -XX:+UseConcMarkSweepGC \
+  -XX:+UseTLAB \
+  -XX:NewSize=128m \
+  -XX:MaxNewSize=128m \
+  -XX:MaxPermSize=512m \
+  -XX:MaxTenuringThreshold=0 \
+  -XX:SurvivorRatio=1024 \
+  -XX:+UseCMSInitiatingOccupancyOnly \
+  -XX:CMSInitiatingOccupancyFraction=60 \
+  -XX:+DisableExplicitGC \
+"
+
+# List of default probes.
+# Add DStatProbe or VmStatProbe if your OS supports it (e.g. if running on Linux).
+BENCHMARK_DEFAULT_PROBES=ThroughputLatencyProbe,PercentileProbe,VmStatProbe,DStatProbe
+
+# Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+# Probe point writer class name.
+# BENCHMARK_WRITER=
+
+# Comma-separated list of the hosts to run BenchmarkServers on. 2 nodes on local host are enabled by default.
+SERVER_HOSTS=10.20.0.221,10.20.0.222,10.20.0.223
+
+# Comma-separated list of the hosts to run BenchmarkDrivers on. 1 node on local host is enabled by default.
+DRIVER_HOSTS=localhost
+
+RESTART_SERVERS=10.20.0.221:145:0:1000000
+
+# Remote username.
+REMOTE_USER=gridgain
+
+# Number of nodes, used to wait for the specified number of nodes to start.
+nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} | tr ',' '\n' | wc -l`))
+
+# Run configuration.
+CONFIGS="\
+-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet1 -cl -r 20000000 -cn rebalance1,\
+"
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn rebalance2,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3 -cl -r 20000000 -cn rebalance3,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet5 -cl -r 20000000 -cn rebalance5,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-1024 -cl -r 20000000 -cn rebalance3-1024,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-10024 -cl -r 20000000 -cn rebalance3-10024,\
+#-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn ${nodesNum} -b 1 -w 0 -d 120 -t 64 -sm PRIMARY_SYNC -dn IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet3-100024 -cl -r 20000000 -cn rebalance3-100024,\
+#"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-log4j.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-log4j.xml b/modules/yardstick/config/ignite-log4j.xml
new file mode 100644
index 0000000..ab3c781
--- /dev/null
+++ b/modules/yardstick/config/ignite-log4j.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  contributor license agreements.  See the NOTICE file distributed with
+  ~  this work for additional information regarding copyright ownership.
+  ~  The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  (the "License"); you may not use this file except in compliance with
+  ~  the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+    "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<!--
+    Default log4j configuration for Ignite.
+-->
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+    <!--
+        Logs System.out messages to console.
+
+        Note, this appender is disabled by default.
+        To enable, uncomment the section below and also CONSOLE appender in the <root> element.
+    -->
+    <!--&lt;!&ndash;-->
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+
+        <param name="Threshold" value="DEBUG"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="levelMin" value="DEBUG"/>
+            <param name="levelMax" value="WARN"/>
+        </filter>
+    </appender>
+    <appender name="ASYNC_CONSOLE" class="org.apache.log4j.AsyncAppender">
+        <param name="BufferSize" value="500"/>
+        <appender-ref ref="CONSOLE"/>
+    </appender>
+    <!--&ndash;&gt;-->
+
+    <!--
+        Logs all ERROR messages to console.
+    -->
+    <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+        <!-- Log to STDERR. -->
+        <param name="Target" value="System.err"/>
+
+        <!-- Log from ERROR and higher (change to WARN if needed). -->
+        <param name="Threshold" value="ERROR"/>
+
+        <!-- The default pattern: Date Priority [Category] Message\n -->
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+    <appender name="ASYNC_CONSOLE_ERR" class="org.apache.log4j.AsyncAppender">
+        <param name="BufferSize" value="500"/>
+        <appender-ref ref="CONSOLE_ERR"/>
+    </appender>
+
+    <!--
+        Logs all output to specified file.
+        By default, the logging goes to IGNITE_HOME/work/log folder
+    -->
+    <appender name="FILE" class="org.apache.ignite.logger.log4j.Log4jRollingFileAppender">
+        <param name="Threshold" value="DEBUG"/>
+        <param name="File" value="${IGNITE_HOME}/work/log/ignite.log"/>
+        <param name="Append" value="true"/>
+        <param name="MaxFileSize" value="10MB"/>
+        <param name="MaxBackupIndex" value="10"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+    <appender name="ASYNC_FILE" class="org.apache.log4j.AsyncAppender">
+        <param name="BufferSize" value="500"/>
+        <appender-ref ref="FILE"/>
+    </appender>
+
+    <!--
+    <category name="org.apache.ignite">
+        <level value="DEBUG"/>
+    </category>
+    -->
+
+    <!--
+        Uncomment to disable courtesy notices, such as SPI configuration
+        consistency warnings.
+    -->
+    <!--
+    <category name="org.apache.ignite.CourtesyConfigNotice">
+        <level value="OFF"/>
+    </category>
+    -->
+
+    <category name="org.springframework">
+        <level value="WARN"/>
+    </category>
+
+    <category name="org.eclipse.jetty">
+        <level value="WARN"/>
+    </category>
+
+    <!--
+        Avoid warnings about failed bind attempt when multiple nodes running on the same host.
+    -->
+    <category name="org.eclipse.jetty.util.log">
+        <level value="ERROR"/>
+    </category>
+
+    <category name="org.eclipse.jetty.util.component">
+        <level value="ERROR"/>
+    </category>
+
+    <category name="com.amazonaws">
+        <level value="WARN"/>
+    </category>
+
+    <!-- Default settings. -->
+    <root>
+        <!-- Print out all info by default. -->
+        <level value="INFO"/>
+
+        <!-- Uncomment to enable logging to console. -->
+        <!--<appender-ref ref="ASYNC_CONSOLE"/>-->
+
+        <appender-ref ref="ASYNC_CONSOLE_ERR"/>
+        <appender-ref ref="ASYNC_FILE"/>
+    </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-rebalancing-multicast-config.xml b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
new file mode 100644
index 0000000..e16c351
--- /dev/null
+++ b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  contributor license agreements.  See the NOTICE file distributed with
+  ~  this work for additional information regarding copyright ownership.
+  ~  The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  (the "License"); you may not use this file except in compliance with
+  ~  the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!--
+    Ignite Spring configuration file to startup grid.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="gridLogger">
+            <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+                <constructor-arg type="java.lang.String" value="config/ignite-log4j.xml"/>
+            </bean>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>10.20.0.219:47500..47509</value>
+                                <value>10.20.0.221:47500..47509</value>
+                                <value>10.20.0.222:47500..47509</value>
+                                <value>10.20.0.223:47500..47509</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance1"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="1"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance2"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="2"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance3"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="3"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance5"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="5"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance3-1024"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="3"/>
+
+                    <property name="rebalanceBatchSize" value="1024"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance3-10024"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="3"/>
+
+                    <property name="rebalanceBatchSize" value="10024"/>
+                </bean>
+
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance3-100024"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+
+                    <property name="rebalanceBatchesCount" value="3"/>
+
+                    <property name="rebalanceBatchSize" value="100024"/>
+                </bean>
+
+            </list>
+        </property>
+
+        <property name="failureDetectionTimeout" value="1000"/>
+
+        <property name="metricsLogFrequency" value="200"/>
+
+        <property name="warmupClosure" ref="warmupClosure"/>
+
+        <property name="rebalanceThreadPoolSize" value="0"/>
+    </bean>
+
+    <bean id="warmupClosure" class="org.apache.ignite.startup.BasicWarmupClosure">
+
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml b/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
new file mode 100644
index 0000000..0e55019
--- /dev/null
+++ b/modules/yardstick/config/ignite-rebalancing-multicast-win-config.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  contributor license agreements.  See the NOTICE file distributed with
+  ~  this work for additional information regarding copyright ownership.
+  ~  The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  (the "License"); you may not use this file except in compliance with
+  ~  the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!--
+    Ignite Spring configuration file to startup grid.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="gridLogger">
+            <bean class="org.apache.ignite.logger.log4j.Log4JLogger">
+                <constructor-arg type="java.lang.String" value="config/ignite-log4j.xml"/>
+            </bean>
+        </property>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500..47509</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <bean id="rebalance2" class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="rebalance2"/>
+
+                    <property name="cacheMode" value="PARTITIONED"/>
+
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="swapEnabled" value="false"/>
+
+                    <property name="backups" value="1"/>
+                </bean>
+            </list>
+        </property>
+
+        <property name="failureDetectionTimeout" value="1000"/>
+
+        <property name="metricsLogFrequency" value="200"/>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/yardstick/pom.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom.xml b/modules/yardstick/pom.xml
index dc4a033..553158c 100644
--- a/modules/yardstick/pom.xml
+++ b/modules/yardstick/pom.xml
@@ -34,7 +34,7 @@
     <version>1.4.1-SNAPSHOT</version>
 
     <properties>
-        <yardstick.version>0.7.0</yardstick.version>
+        <yardstick.version>0.7.1</yardstick.version>
     </properties>
 
 
@@ -98,6 +98,12 @@
             <artifactId>spring-aop</artifactId>
             <version>${spring.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>


[4/6] ignite git commit: Ignite-1093 Improved rebalancing

Posted by sb...@apache.org.
Ignite-1093
Improved rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d5718f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d5718f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d5718f2

Branch: refs/heads/ignite-1093-2
Commit: 9d5718f2e20d07f754ce7b5802a08a5b4d491d10
Parents: 37a0505
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Aug 28 19:20:08 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Aug 28 19:20:08 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |   40 +-
 .../configuration/IgniteConfiguration.java      |   30 +
 .../apache/ignite/internal/IgniteKernal.java    |    1 +
 .../ignite/internal/IgniteNodeAttributes.java   |    3 +
 .../communication/GridIoMessageFactory.java     |    7 +-
 .../processors/cache/GridCacheIoManager.java    |    8 +
 .../GridCachePartitionExchangeManager.java      |   59 +-
 .../processors/cache/GridCachePreloader.java    |   20 +-
 .../cache/GridCachePreloaderAdapter.java        |   12 +-
 .../processors/cache/GridCacheProcessor.java    |    4 +-
 .../preloader/GridDhtPartitionDemandPool.java   | 1161 ---------------
 .../dht/preloader/GridDhtPartitionDemander.java | 1362 ++++++++++++++++++
 .../dht/preloader/GridDhtPartitionSupplier.java |  783 ++++++++++
 .../GridDhtPartitionSupplyMessageV2.java        |  423 ++++++
 .../preloader/GridDhtPartitionSupplyPool.java   |  545 -------
 .../dht/preloader/GridDhtPreloader.java         |  195 ++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    2 +-
 ...CacheP2pUnmarshallingRebalanceErrorTest.java |    3 +-
 .../GridCacheRebalancingAsyncSelfTest.java      |   85 ++
 .../GridCacheRebalancingSyncSelfTest.java       |  270 ++++
 .../GridCacheReplicatedPreloadSelfTest.java     |   20 -
 .../testsuites/IgniteCacheTestSuite3.java       |    4 +
 .../config/benchmark-rebalancing-win.properties |   60 +
 .../config/benchmark-rebalancing.properties     |   79 +
 modules/yardstick/config/ignite-log4j.xml       |  143 ++
 .../ignite-rebalancing-multicast-config.xml     |  174 +++
 .../ignite-rebalancing-multicast-win-config.xml |   70 +
 modules/yardstick/pom.xml                       |    8 +-
 .../yardstick/IgniteBenchmarkArguments.java     |   13 +-
 .../cache/IgniteCacheAbstractBenchmark.java     |    5 +-
 .../IgniteRebalancePutGetBenchmark.java         |   72 +
 31 files changed, 3876 insertions(+), 1785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index af2bbe8..8057329 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -57,6 +57,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default rebalance timeout (ms).*/
     public static final long DFLT_REBALANCE_TIMEOUT = 10000;
 
+    /** Default rebalance batches count. */
+    public static final long DFLT_REBALANCE_BATCHES_COUNT = 3;
+
     /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
     public static final long DFLT_REBALANCE_THROTTLE = 0;
 
@@ -247,6 +250,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Off-heap memory size. */
     private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
 
+    /** Rebalance batches count. */
+    private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
     /** */
     private boolean swapEnabled = DFLT_SWAP_ENABLED;
 
@@ -1293,11 +1299,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
+    @Deprecated
     /**
-     * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
-     * may create more threads for rebalancing than specified here (but never less threads).
-     * <p>
-     * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @return Size of rebalancing thread pool.
      */
@@ -1305,9 +1309,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return rebalancePoolSize;
     }
 
+    @Deprecated
     /**
-     * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
-     * for rebalancing than specified here (but never less threads).
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @param rebalancePoolSize Size of rebalancing thread pool.
      * @return {@code this} for chaining.
@@ -1791,6 +1795,30 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Gets number of batches generated by supply node at rebalancing start.
+     *
+     * @return batches count
+     */
+    public long getRebalanceBatchesCount() {
+        return rebalanceBatchesCount;
+    }
+
+    /**
+     * Sets number of batches generated by supply node at rebalancing start.
+     *
+     * @param rebalanceBatchesCnt batches count.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+        this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+        return this;
+    }
+
+    /**
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 1fa1de4..06fb152 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -141,6 +141,9 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
+    /** Default limit of threads used at rebalance. 2 demand + 2 supply threads. */
+    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 4;
+
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
 
@@ -346,6 +349,9 @@ public class IgniteConfiguration {
     /** Client mode flag. */
     private Boolean clientMode;
 
+    /** Rebalance thread pool size. */
+    private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
     /** Transactions configuration. */
     private TransactionConfiguration txCfg = new TransactionConfiguration();
 
@@ -1326,6 +1332,30 @@ public class IgniteConfiguration {
         return this;
     }
 
+
+    /**
+     * Gets count of available rebalancing threads.
+     * Half will be used for supplying and half for demanding of partitions.
+     * Minimum is 2.
+     * @return count.
+     */
+    public int getRebalanceThreadPoolSize(){
+        return rebalanceThreadPoolSize;
+    }
+
+    /**
+     * Sets count of available rebalancing threads.
+     * Half will be used for supplying and half for demanding of partitions.
+     * Minimum is 2.
+     * @param size Size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+        this.rebalanceThreadPoolSize = size;
+
+        return this;
+    }
+
     /**
      * Returns a collection of life-cycle beans. These beans will be automatically
      * notified of grid life-cycle events. Use life-cycle beans whenever you

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1db73bf..03110c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1170,6 +1170,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
         add(ATTR_USER_NAME, System.getProperty("user.name"));
         add(ATTR_GRID_NAME, gridName);
+        add(REBALANCING_VERSION, 1);
 
         add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled());
         add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 10b8df0..c04c69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -135,6 +135,9 @@ public final class IgniteNodeAttributes {
     /** Node consistent id. */
     public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id";
 
+    /** Rebalancing version id. */
+    public static final String REBALANCING_VERSION = ATTR_PREFIX + ".rebalancing.version";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2acfd2b..9d25de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -606,7 +606,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..112] - this
+            case 114:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
+            // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0ef190e..9e8f8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -502,6 +502,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 114: {
+                GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+                U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e00d3b7..bf77d1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,7 +48,8 @@ import java.util.concurrent.locks.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
 
@@ -274,6 +275,41 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
+        if (!cctx.kernalContext().clientNode()) {
+
+            for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+
+                            cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+                cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
         new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
 
         onDiscoveryEvent(cctx.localNodeId(), fut);
@@ -336,6 +372,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     }
 
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object demanderTopic(int idx) {
+        return TOPIC_CACHE.topic("Demander", idx);
+    }
+
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object supplierTopic(int idx) {
+        return TOPIC_CACHE.topic("Supplier", idx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -360,6 +412,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(err);
 
+        for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); cnt++) {
+            cctx.io().removeOrderedHandler(demanderTopic(cnt));
+            cctx.io().removeOrderedHandler(supplierTopic(cnt));
+        }
+
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b8bb08e..105bec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -91,7 +91,7 @@ public interface GridCachePreloader {
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException;
 
     /**
      * @param p Preload predicate.
@@ -132,4 +132,22 @@ public interface GridCachePreloader {
      * Unwinds undeploys.
      */
     public void unwindUndeploys();
+
+
+    /**
+     * Handles Supply message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param s Supply message.
+     */
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+    /**
+     * Handles Demand message.
+     *
+     * @param id Node Id.
+     * @param d Demand message.
+     */
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 0adf510..527e5bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -117,6 +117,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
         return new GridFinishedFuture<>();
     }
@@ -142,7 +152,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dd4d30b..f283008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -324,10 +324,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
                 U.maskName(cc.getName()) + ']');
 
-        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
-            assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0");
+        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
             assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
-        }
 
         if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
             if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d5718f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index a6e6c4d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-import static org.apache.ignite.internal.processors.dr.GridDrType.*;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
-    /** Dummy message to wake up a blocking queue if a node leaves. */
-    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    @GridToStringInclude
-    private final Collection<DemandWorker> dmdWorkers;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
-    @GridToStringInclude
-    private SyncFuture syncFut;
-
-    /** Preload timeout. */
-    private final AtomicLong timeout;
-
-    /** Allows demand threads to synchronize their step. */
-    private CyclicBarrier barrier;
-
-    /** Demand lock. */
-    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
-    /** */
-    private int poolSize;
-
-    /** Last timeout object. */
-    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
-    /** Last exchange future. */
-    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
-        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-        if (enabled) {
-            barrier = new CyclicBarrier(poolSize);
-
-            dmdWorkers = new ArrayList<>(poolSize);
-
-            for (int i = 0; i < poolSize; i++)
-                dmdWorkers.add(new DemandWorker(i));
-
-            syncFut = new SyncFuture(dmdWorkers);
-        }
-        else {
-            dmdWorkers = Collections.emptyList();
-
-            syncFut = new SyncFuture(dmdWorkers);
-
-            // Calling onDone() immediately since preloading is disabled.
-            syncFut.onDone();
-        }
-
-        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
-    }
-
-    /**
-     *
-     */
-    void start() {
-        if (poolSize > 0) {
-            for (DemandWorker w : dmdWorkers)
-                new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
-        }
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(dmdWorkers);
-
-        if (log.isDebugEnabled())
-            log.debug("Before joining on demand workers: " + dmdWorkers);
-
-        U.join(dmdWorkers, log);
-
-        if (log.isDebugEnabled())
-            log.debug("After joining on demand workers: " + dmdWorkers);
-
-        lastExchangeFut = null;
-
-        lastTimeoutObj.set(null);
-    }
-
-    /**
-     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
-     */
-    IgniteInternalFuture<?> syncFuture() {
-        return syncFut;
-    }
-
-    /**
-     * Sets preload predicate for demand pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return poolSize;
-    }
-
-    /**
-     * Wakes up demand workers when new exchange future was added.
-     */
-    void onExchangeFutureAdded() {
-        synchronized (dmdWorkers) {
-            for (DemandWorker w : dmdWorkers)
-                w.addMessage(DUMMY_TOP);
-        }
-    }
-
-    /**
-     * Force preload.
-     */
-    void forcePreload() {
-        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
-        if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
-
-        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-        if (exchFut != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing rebalance event for future: " + exchFut);
-
-            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    cctx.shared().exchange().forcePreloadExchange(exchFut);
-                }
-            });
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Ignoring force rebalance request (no topology event happened yet).");
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-        preloadEvent(-1, type, discoEvt);
-    }
-
-    /**
-     * @param part Partition.
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
-        assert discoEvt != null;
-
-        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
-    }
-
-    /**
-     * @param msg Message to check.
-     * @return {@code True} if dummy message.
-     */
-    private boolean dummyTopology(SupplyMessage msg) {
-        return msg == DUMMY_TOP;
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Picked owners.
-     */
-    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        int affCnt = affNodes.size();
-
-        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
-        int rmtCnt = rmts.size();
-
-        if (rmtCnt <= affCnt)
-            return rmts;
-
-        List<ClusterNode> sorted = new ArrayList<>(rmts);
-
-        // Sort in descending order, so nodes with higher order will be first.
-        Collections.sort(sorted, CU.nodeComparator(false));
-
-        // Pick newest nodes.
-        return sorted.subList(0, affCnt);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Nodes owning this partition.
-     */
-    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
-    }
-
-    /**
-     * @param assigns Assignments.
-     * @param force {@code True} if dummy reassign.
-     */
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
-        if (log.isDebugEnabled())
-            log.debug("Adding partition assignments: " + assigns);
-
-        long delay = cctx.config().getRebalanceDelay();
-
-        if (delay == 0 || force) {
-            assert assigns != null;
-
-            synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers) {
-                    w.addAssignments(assigns);
-
-                    w.addMessage(DUMMY_TOP);
-                }
-            }
-        }
-        else if (delay > 0) {
-            assert !force;
-
-            GridTimeoutObject obj = lastTimeoutObj.get();
-
-            if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
-
-            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-            assert exchFut != null : "Delaying rebalance process without topology event.";
-
-            obj = new GridTimeoutObjectAdapter(delay) {
-                @Override public void onTimeout() {
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forcePreloadExchange(exchFut);
-                        }
-                    });
-                }
-            };
-
-            lastTimeoutObj.set(obj);
-
-            cctx.time().addTimeoutObject(obj);
-        }
-    }
-
-    /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
-
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandPool.class, this);
-    }
-
-    /**
-     *
-     */
-    private class DemandWorker extends GridWorker {
-        /** Worker ID. */
-        private int id;
-
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
-        /** Message queue. */
-        private final LinkedBlockingDeque<SupplyMessage> msgQ =
-            new LinkedBlockingDeque<>();
-
-        /** Counter. */
-        private long cntr;
-
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
-        /**
-         * @param id Worker ID.
-         */
-        private DemandWorker(int id) {
-            super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
-            assert id >= 0;
-
-            this.id = id;
-        }
-
-        /**
-         * @param assigns Assignments.
-         */
-        void addAssignments(GridDhtPreloaderAssignments assigns) {
-            assert assigns != null;
-
-            assignQ.offer(assigns);
-
-            if (log.isDebugEnabled())
-                log.debug("Added assignments to worker: " + this);
-        }
-
-        /**
-         * @return {@code True} if topology changed.
-         */
-        private boolean topologyChanged() {
-            return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
-        }
-
-        /**
-         * @param msg Message.
-         */
-        private void addMessage(SupplyMessage msg) {
-            if (!enterBusy())
-                return;
-
-            try {
-                assert dummyTopology(msg) || msg.supply().workerId() == id;
-
-                msgQ.offer(msg);
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-
-        /**
-         * @param timeout Timed out value.
-         */
-        private void growTimeout(long timeout) {
-            long newTimeout = (long)(timeout * 1.5D);
-
-            // Account for overflow.
-            if (newTimeout < 0)
-                newTimeout = Long.MAX_VALUE;
-
-            // Grow by 50% only if another thread didn't do it already.
-            if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
-                U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
-                    newTimeout + "ms.");
-        }
-
-        /**
-         * @param pick Node picked for preloading.
-         * @param p Partition.
-         * @param entry Preloaded entry.
-         * @param topVer Topology version.
-         * @return {@code False} if partition has become invalid during preloading.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private boolean preloadEntry(
-            ClusterNode pick,
-            int p,
-            GridCacheEntryInfo entry,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            try {
-                GridCacheEntryEx cached = null;
-
-                try {
-                    cached = cctx.dht().entryEx(entry.key());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
-                    if (cctx.dht().isIgfsDataCache() &&
-                        cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
-                        LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
-                            "value, will ignore rebalance entries): " + name());
-
-                        if (cached.markObsoleteIfEmpty(null))
-                            cached.context().cache().removeIfObsolete(cached.key());
-
-                        return true;
-                    }
-
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start tracking.
-
-                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null);
-                        }
-                        else if (log.isDebugEnabled())
-                            log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
-                                ", part=" + p + ']');
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
-                            cached.key() + ", part=" + p + ']');
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
-                    return false;
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
-            }
-
-            return true;
-        }
-
-        /**
-         * @param idx Unique index for this topic.
-         * @return Topic for partition.
-         */
-        public Object topic(long idx) {
-            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @return Missed partitions.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private Set<Integer> demandFromNode(
-            ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            GridDhtPartitionDemandMessage d,
-            GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            GridDhtPartitionTopology top = cctx.dht().topology();
-
-            cntr++;
-
-            d.topic(topic(cntr));
-            d.workerId(id);
-
-            Set<Integer> missed = new HashSet<>();
-
-            // Get the same collection that will be sent in the message.
-            Collection<Integer> remaining = d.partitions();
-
-            // Drain queue before processing a new node.
-            drainQueue();
-
-            if (isCancelled() || topologyChanged())
-                return missed;
-
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
-                    addMessage(new SupplyMessage(nodeId, msg));
-                }
-            });
-
-            try {
-                boolean retry;
-
-                // DoWhile.
-                // =======
-                do {
-                    retry = false;
-
-                    // Create copy.
-                    d = new GridDhtPartitionDemandMessage(d, remaining);
-
-                    long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
-                    d.timeout(timeout);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
-                    // Send demand message.
-                    cctx.io().send(node, d, cctx.ioPolicy());
-
-                    // While.
-                    // =====
-                    while (!isCancelled() && !topologyChanged()) {
-                        SupplyMessage s = poll(msgQ, timeout, this);
-
-                        // If timed out.
-                        if (s == null) {
-                            if (msgQ.isEmpty()) { // Safety check.
-                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
-                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
-                                    " configuration properties).");
-
-                                growTimeout(timeout);
-
-                                // Ordered listener was removed if timeout expired.
-                                cctx.io().removeOrderedHandler(d.topic());
-
-                                // Must create copy to be able to work with IO manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage(d, remaining);
-
-                                // Create new topic.
-                                d.topic(topic(++cntr));
-
-                                // Create new ordered listener.
-                                cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                                        @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage msg) {
-                                            addMessage(new SupplyMessage(nodeId, msg));
-                                        }
-                                    });
-
-                                // Resend message with larger timeout.
-                                retry = true;
-
-                                break; // While.
-                            }
-                            else
-                                continue; // While.
-                        }
-
-                        // If topology changed.
-                        if (dummyTopology(s)) {
-                            if (topologyChanged())
-                                break; // While.
-                            else
-                                continue; // While.
-                        }
-
-                        // Check that message was received from expected node.
-                        if (!s.senderId().equals(node.id())) {
-                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
-                            continue; // While.
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received supply message: " + s);
-
-                        GridDhtPartitionSupplyMessage supply = s.supply();
-
-                        // Check whether there were class loading errors on unmarshal
-                        if (supply.classError() != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Class got undeployed during preloading: " + supply.classError());
-
-                            retry = true;
-
-                            // Quit preloading.
-                            break;
-                        }
-
-                        // Preload.
-                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                            int p = e.getKey();
-
-                            if (cctx.affinity().localNode(p, topVer)) {
-                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                                assert part != null;
-
-                                if (part.state() == MOVING) {
-                                    boolean reserved = part.reserve();
-
-                                    assert reserved : "Failed to reserve partition [gridName=" +
-                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
-                                    part.lock();
-
-                                    try {
-                                        Collection<Integer> invalidParts = new GridLeanSet<>();
-
-                                        // Loop through all received entries and try to preload them.
-                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
-                                            if (!invalidParts.contains(p)) {
-                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Preloading is not permitted for entry due to " +
-                                                            "evictions [key=" + entry.key() +
-                                                            ", ver=" + entry.version() + ']');
-
-                                                    continue;
-                                                }
-
-                                                if (!preloadEntry(node, p, entry, topVer)) {
-                                                    invalidParts.add(p);
-
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Got entries for invalid partition during " +
-                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-                                                }
-                                            }
-                                        }
-
-                                        boolean last = supply.last().contains(p);
-
-                                        // If message was last for this partition,
-                                        // then we take ownership.
-                                        if (last) {
-                                            remaining.remove(p);
-
-                                            top.own(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Finished rebalancing partition: " + part);
-
-                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                                    exchFut.discoveryEvent());
-                                        }
-                                    }
-                                    finally {
-                                        part.unlock();
-                                        part.release();
-                                    }
-                                }
-                                else {
-                                    remaining.remove(p);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
-                                }
-                            }
-                            else {
-                                remaining.remove(p);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
-                            }
-                        }
-
-                        remaining.removeAll(s.supply().missed());
-
-                        // Only request partitions based on latest topology version.
-                        for (Integer miss : s.supply().missed())
-                            if (cctx.affinity().localNode(miss, topVer))
-                                missed.add(miss);
-
-                        if (remaining.isEmpty())
-                            break; // While.
-
-                        if (s.supply().ack()) {
-                            retry = true;
-
-                            break;
-                        }
-                    }
-                }
-                while (retry && !isCancelled() && !topologyChanged());
-
-                return missed;
-            }
-            finally {
-                cctx.io().removeOrderedHandler(d.topic());
-            }
-        }
-
-        /**
-         * @throws InterruptedException If interrupted.
-         */
-        private void drainQueue() throws InterruptedException {
-            while (!msgQ.isEmpty()) {
-                SupplyMessage msg = msgQ.take();
-
-                if (log.isDebugEnabled())
-                    log.debug("Drained supply message: " + msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                if (!CU.isMarshallerCache(cctx.name())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
-                    try {
-                        cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                if (rebalanceOrder > 0) {
-                    IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
-                    try {
-                        if (fut != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
-                                    ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                            fut.get();
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                boolean stopEvtFired = false;
-
-                while (!isCancelled()) {
-                    try {
-                        barrier.await();
-
-                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
-                            cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
-                            if (!cctx.isReplicated() || !stopEvtFired) {
-                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                                stopEvtFired = true;
-                            }
-                        }
-                    }
-                    catch (BrokenBarrierException ignore) {
-                        throw new InterruptedException("Demand worker stopped.");
-                    }
-
-                    // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments assigns = null;
-
-                    while (assigns == null)
-                        assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
-                    demandLock.readLock().lock();
-
-                    try {
-                        exchFut = assigns.exchangeFuture();
-
-                        // Assignments are empty if preloading is disabled.
-                        if (assigns.isEmpty())
-                            continue;
-
-                        boolean resync = false;
-
-                        // While.
-                        // =====
-                        while (!isCancelled() && !topologyChanged() && !resync) {
-                            Collection<Integer> missed = new HashSet<>();
-
-                            // For.
-                            // ===
-                            for (ClusterNode node : assigns.keySet()) {
-                                if (topologyChanged() || isCancelled())
-                                    break; // For.
-
-                                GridDhtPartitionDemandMessage d = assigns.remove(node);
-
-                                // If another thread is already processing this message,
-                                // move to the next node.
-                                if (d == null)
-                                    continue; // For.
-
-                                try {
-                                    Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
-                                    if (!set.isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
-                                                set + ']');
-
-                                        missed.addAll(set);
-                                    }
-                                }
-                                catch (IgniteInterruptedCheckedException e) {
-                                    throw e;
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
-                                            ", msg=" + e.getMessage() + ']');
-
-                                    resync = true;
-
-                                    break; // For.
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
-                                }
-                            }
-
-                            // Processed missed entries.
-                            if (!missed.isEmpty()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reassigning partitions that were missed: " + missed);
-
-                                assert exchFut.exchangeId() != null;
-
-                                cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                            }
-                            else
-                                break; // While.
-                        }
-                    }
-                    finally {
-                        demandLock.readLock().unlock();
-
-                        syncFut.onWorkerDone(this);
-                    }
-
-                    cctx.shared().exchange().scheduleResendPartitions();
-                }
-            }
-            finally {
-                // Safety.
-                syncFut.onWorkerDone(this);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
-        }
-    }
-
-    /**
-     * Sets last exchange future.
-     *
-     * @param lastFut Last future to set.
-     */
-    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        lastExchangeFut = lastFut;
-    }
-
-    /**
-     * @param exchFut Exchange future.
-     * @return Assignments of partitions to nodes.
-     */
-    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
-
-        if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        int partCnt = cctx.affinity().partitions();
-
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
-
-        for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
-                        exchFut.exchangeId());
-
-                break;
-            }
-
-            // If partition belongs to local node.
-            if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                assert part != null;
-                assert part.id() == p;
-
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
-                    continue; // For.
-                }
-
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
-                if (picked.isEmpty()) {
-                    top.own(part);
-
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.first(picked);
-
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
-
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
-                    }
-
-                    msg.addPartition(p);
-                }
-            }
-        }
-
-        return assigns;
-    }
-
-    /**
-     *
-     */
-    private class SyncFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Remaining workers. */
-        private Collection<DemandWorker> remaining;
-
-        /**
-         * @param workers List of workers.
-         */
-        private SyncFuture(Collection<DemandWorker> workers) {
-            assert workers.size() == poolSize();
-
-            remaining = Collections.synchronizedList(new LinkedList<>(workers));
-        }
-
-        /**
-         * @param w Worker who iterated through all partitions.
-         */
-        void onWorkerDone(DemandWorker w) {
-            if (isDone())
-                return;
-
-            if (remaining.remove(w))
-                if (log.isDebugEnabled())
-                    log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
-            if (remaining.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed sync future.");
-
-                onDone();
-            }
-        }
-    }
-
-    /**
-     * Supply message wrapper.
-     */
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
-
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
-        }
-    }
-}


[5/6] ignite git commit: ignite-1093

Posted by sb...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/765e2cfb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/765e2cfb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/765e2cfb

Branch: refs/heads/ignite-1093-2
Commit: 765e2cfbd6955996364da8b3320b64b82898daf5
Parents: 9d5718f
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Aug 31 16:49:24 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Aug 31 16:49:24 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePartitionExchangeManager.java       | 1 -
 .../cache/distributed/dht/preloader/GridDhtPartitionDemander.java | 3 +++
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/765e2cfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bf77d1e..be24022 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -286,7 +286,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             return;
 
                         try {
-
                             cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
                         }
                         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/765e2cfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 4fe2153..2d73c19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -475,6 +475,9 @@ public class GridDhtPartitionDemander {
 
         final SyncFuture fut = syncFut;
 
+        if (!fut.topologyVersion().equals(topVer))
+            return;
+
         if (topologyChanged(topVer)) {
             fut.onCancel();
 


[6/6] ignite git commit: ignite-1093

Posted by sb...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e6866c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e6866c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e6866c5

Branch: refs/heads/ignite-1093-2
Commit: 0e6866c55ae30ed15fa31ab6e422690980f38aef
Parents: 765e2cf
Author: Anton Vinogradov <av...@apache.org>
Authored: Mon Aug 31 17:21:44 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Aug 31 17:21:44 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/preloader/GridDhtPartitionDemander.java     | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e6866c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2d73c19..bfe1861 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -954,11 +954,6 @@ public class GridDhtPartitionDemander {
                     cctx.events().removeListener(lsnr);
 
                 onDone(completed);
-
-                missed.clear();
-                remaining.clear();
-                started.clear();
-                assigns.clear();
             }
         }
     }