You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/11 18:32:07 UTC

[1/8] incubator-ignite git commit: ignite-1093 Non stop rebalancing

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 c92cd899a -> 50e188df2


ignite-1093 Non stop rebalancing


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4776feca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4776feca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4776feca

Branch: refs/heads/ignite-1093
Commit: 4776fecaf059d11d4a4f3ff57634ebad9e41f451
Parents: c92cd89
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 7 18:45:07 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 7 18:45:07 2015 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionDemandPool.java   | 176 +++++++++----------
 .../preloader/GridDhtPartitionSupplyPool.java   |  80 ++++++---
 .../GridCacheMassiveRebalancingSelfTest.java    |  10 +-
 3 files changed, 144 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 0e0bc01..11645e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -225,6 +225,14 @@ public class GridDhtPartitionDemandPool {
     }
 
     /**
+     * @param idx
+     * @return topic
+     */
+    static Object topic(int idx, int cacheId, UUID nodeId) {
+        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+    }
+
+    /**
      *
      */
     private void leaveBusy() {
@@ -537,39 +545,58 @@ public class GridDhtPartitionDemandPool {
             if (isCancelled() || topologyChanged())
                 return missed;
 
-            for (int p : d.partitions()) {
-                cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
-                        handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
-                            exchFut, missed, d);
+            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+
+            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+
+            int cnt = 0;
+
+            while (cnt < threadCnt) {
+                sParts.add(new HashSet<Integer>());
+
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
+                        handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+                            exchFut, missed, d, remaining);
                     }
                 });
+
+                cnt++;
             }
 
-            try {
-                Iterator<Integer> it = remaining.keySet().iterator();
+            Iterator<Integer> it = d.partitions().iterator();
 
-                final int maxC = cctx.config().getRebalanceThreadPoolSize();
+            cnt = 0;
 
-                int sent = 0;
+            while (it.hasNext()) {
+                sParts.get(cnt % threadCnt).add(it.next());
 
-                while (sent < maxC && it.hasNext()) {
-                    int p = it.next();
+                cnt++;
+            }
 
-                    boolean res = remaining.replace(p, false, true);
+            try {
+                cnt = 0;
 
-                    assert res;
+                while (cnt < threadCnt) {
 
                     // Create copy.
-                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                    initD.topic(topic(p, topVer.topologyVersion()));
+                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
 
-                    // Send initial demand message.
-                    cctx.io().sendOrderedMessage(node,
-                        GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+                    try {
+                        if (logg && cctx.name().equals("cache"))
+                        System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
 
-                    sent++;
+                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send partition demand message to local node", e);
+                    }
+
+                    cnt++;
                 }
 
                 do {
@@ -580,41 +607,41 @@ public class GridDhtPartitionDemandPool {
                 return missed;
             }
             finally {
-                for (int p : d.partitions())
-                    cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion()));
+                cnt = 0;
+
+                while (cnt < threadCnt) {
+                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+
+                    cnt++;
+                }
             }
         }
 
-        /**
-         * @param p
-         * @param topVer
-         * @return topic
-         */
-        private Object topic(int p, long topVer) {
-            return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long
-        }
+        boolean logg = false;
 
         /**
          * @param s Supply message.
          * @param node Node.
          * @param topVer Topology version.
          * @param top Topology.
-         * @param remaining Remaining.
          * @param exchFut Exchange future.
          * @param missed Missed.
          * @param d initial DemandMessage.
          */
         private void handleSupplyMessage(
+            int idx,
             SupplyMessage s,
             ClusterNode node,
             AffinityTopologyVersion topVer,
             GridDhtPartitionTopology top,
-            ConcurrentHashMap8<Integer, Boolean> remaining,
             GridDhtPartitionsExchangeFuture exchFut,
             Set<Integer> missed,
-            GridDhtPartitionDemandMessage d) {
+            GridDhtPartitionDemandMessage d,
+            ConcurrentHashMap8 remaining) {
+
+            if (logg && cctx.name().equals("cache"))
+            System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
 
-            //Todo: check it still actual and remove
             // Check that message was received from expected node.
             if (!s.senderId().equals(node.id())) {
                 U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
@@ -639,10 +666,8 @@ public class GridDhtPartitionDemandPool {
                 return;
             }
 
-            assert supply.infos().entrySet().size() == 1;//Todo: remove after supply message refactoring
-
             // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {//todo:only one partition (supply refactoring)
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                 int p = e.getKey();
 
                 if (cctx.affinity().localNode(p, topVer)) {
@@ -685,12 +710,17 @@ public class GridDhtPartitionDemandPool {
                                 }
                             }
 
-                            boolean last = supply.last().contains(p);//Todo: refactor as boolean "last"
+                            boolean last = supply.last().contains(p);
 
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
-                                top.own(part);
+                                top.own(part);//todo: close future?
+
+//                                if (logg && cctx.name().equals("cache"))
+//                                    System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+
+                                remaining.remove(p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
@@ -698,29 +728,6 @@ public class GridDhtPartitionDemandPool {
                                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
                                     preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
                                         exchFut.discoveryEvent());
-
-                                remaining.remove(p);
-
-                                demandNextPartition(node, remaining, d, topVer);
-                            }
-                            else {
-                                try {
-                                    // Create copy.
-                                    GridDhtPartitionDemandMessage nextD =
-                                        new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
-
-                                    nextD.topic(topic(p, topVer.topologyVersion()));
-
-                                    // Send demand message.
-                                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
-                                        nextD, cctx.ioPolicy(), d.timeout());
-                                   }
-                                catch (IgniteCheckedException ex) {
-                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
-
-                                    cancel();
-                                }
                             }
                         }
                         finally {
@@ -743,48 +750,35 @@ public class GridDhtPartitionDemandPool {
                 }
             }
 
-            for (Integer miss : s.supply().missed()) // Todo: miss as param, not collection
+            for (Integer miss : s.supply().missed())
                 remaining.remove(miss);
 
             // Only request partitions based on latest topology version.
             for (Integer miss : s.supply().missed())
                 if (cctx.affinity().localNode(miss, topVer))
                     missed.add(miss);
-        }
 
-        /**
-         * @param node Node.
-         * @param remaining Remaining.
-         * @param d initial DemandMessage.
-         * @param topVer Topology version.
-         */
-        private void demandNextPartition(
-            final ClusterNode node,
-            final ConcurrentHashMap8<Integer, Boolean> remaining,
-            final GridDhtPartitionDemandMessage d,
-            final AffinityTopologyVersion topVer
-        ) {
-            try {
-                for (Integer p : remaining.keySet()) {
-                    if (remaining.replace(p, false, true)) {
-                        // Create copy.
-                        GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+            if (!remaining.isEmpty()) {
+                try {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                        nextD.topic(topic(p, topVer.topologyVersion()));
+                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
 
-                        // Send demand message.
-                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
-                            nextD, cctx.ioPolicy(), d.timeout());
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()),
+                        nextD, cctx.ioPolicy(), d.timeout());
 
-                        break;
-                    }
+                    if (logg && cctx.name().equals("cache"))
+                        System.out.println("D " + idx + " ack  " + cctx.localNode().id());
                 }
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                    "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
 
-                cancel();
+                    cancel();
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index f10837a..c1c9941 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -76,25 +76,32 @@ class GridDhtPartitionSupplyPool {
 
         top = cctx.dht().topology();
 
+        int cnt = 0;
+
         if (!cctx.kernalContext().clientNode()) {
-            for (int p = 0; p <= cctx.affinity().partitions(); p++)
-                cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+            while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
                     @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                        processMessage(m, id);
+                        processMessage(m, id, idx);
                     }
                 });
+
+                cnt++;
+            }
         }
 
         depEnabled = cctx.gridDeploy().enabled();
     }
 
     /**
-     * @param p Partition.
+     * @param idx Index.
      * @param id Node id.
      * @return topic
      */
-    static Object topic(int p, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", id, p);
+    static Object topic(int idx, int id) {
+        return TOPIC_CACHE.topic("SupplyPool", idx, id);
     }
 
     /**
@@ -119,44 +126,65 @@ class GridDhtPartitionSupplyPool {
         this.preloadPred = preloadPred;
     }
 
+    boolean logg = false;
+
     /**
      * @param d Demand message.
      * @param id Node uuid.
      */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
         assert d != null;
         assert id != null;
 
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S " + idx + " process message " + cctx.localNode().id());
+
         GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
             d.updateSequence(), cctx.cacheId());
 
         long preloadThrottle = cctx.config().getRebalanceThrottle();
 
-        long maxBatchesCnt = 3;//Todo: param
-
         ClusterNode node = cctx.discovery().node(id);
 
-        boolean ack = false;
-
         T2<UUID, Object> scId = new T2<>(id, d.topic());
 
         try {
             SupplyContext sctx = scMap.remove(scId);
 
-            if (doneMap.get(scId) != null)//Todo: refactor
+            if (!d.partitions().isEmpty()) {//Only first request contains partitions.
+                doneMap.remove(scId);
+            }
+
+            if (doneMap.get(scId) != null) {
+                if (logg && cctx.name().equals("cache"))
+                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
+
                 return;
+            }
 
             long bCnt = 0;
 
             int phase = 0;
 
-            if (sctx != null)
+            boolean newReq = true;
+
+            long maxBatchesCnt = 3;//Todo: param
+
+            if (sctx != null) {
                 phase = sctx.phase;
 
+                maxBatchesCnt = 1;
+            }
+
             Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
 
-            while (sctx != null || partIt.hasNext()) {
-                int part = sctx != null ? sctx.part : partIt.next();
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
 
                 GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 
@@ -206,8 +234,6 @@ class GridDhtPartitionSupplyPool {
                             }
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
                                 if (!reply(node, d, s))
                                     return;
 
@@ -223,6 +249,9 @@ class GridDhtPartitionSupplyPool {
                                     return;
                                 }
                                 else {
+                                    if (logg && cctx.name().equals("cache"))
+                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
+
                                     s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
                                 }
@@ -275,8 +304,6 @@ class GridDhtPartitionSupplyPool {
                                     }
 
                                     if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                        ack = true;
-
                                         if (!reply(node, d, s))
                                             return;
 
@@ -382,8 +409,6 @@ class GridDhtPartitionSupplyPool {
                             }
 
                             if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                ack = true;
-
                                 if (!reply(node, d, s))
                                     return;
 
@@ -415,11 +440,12 @@ class GridDhtPartitionSupplyPool {
                     // Mark as last supply message.
                     s.last(part);
 
-                    if (ack) {
-                        s.markAck();
+//                    if (logg && cctx.name().equals("cache"))
+//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
 
-                        break; // Partition for loop.
-                    }
+                    phase = 0;
+
+                    sctx = null;
                 }
                 finally {
                     loc.release();
@@ -442,13 +468,15 @@ class GridDhtPartitionSupplyPool {
 
     /**
      * @param n Node.
-     * @param d Demand message.
      * @param s Supply message.
      * @return {@code True} if message was sent, {@code false} if recipient left grid.
      * @throws IgniteCheckedException If failed.
      */
     private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
         throws IgniteCheckedException {
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S sent "+ cctx.localNode().id());
+
         try {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4776feca/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 11ea8f6..4992d19 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_024_000;
+    private static int TEST_SIZE = 10_024_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";
@@ -58,7 +58,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
 
         cacheCfg.setName(CACHE_NAME_DHT);
         cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        cacheCfg.setRebalanceBatchSize(100 * 1024);
+        //cacheCfg.setRebalanceBatchSize(1024);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheCfg.setRebalanceThreadPoolSize(4);
         //cacheCfg.setRebalanceTimeout(1000000);
@@ -107,7 +107,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
 
         long start = System.currentTimeMillis();
 
-        //startGrid(1);
+        startGrid(1);
 
         startGrid(2);
 
@@ -115,9 +115,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
 
         stopGrid(0);
 
-        //Thread.sleep(20000);
+        Thread.sleep(20000);
 
-        //stopGrid(1);
+        stopGrid(1);
 
         checkData(grid(2));
 


[5/8] incubator-ignite git commit: ignite-1093

Posted by av...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b7d9fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b7d9fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b7d9fc

Branch: refs/heads/ignite-1093
Commit: d0b7d9fca8713aefeb6e6477679efb9d7a8db9e0
Parents: 76ba5d9
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 18:09:00 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 18:09:00 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |   2 +-
 .../cache/GridCachePreloaderAdapter.java        |   2 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 941 +++++++------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  21 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 5 files changed, 328 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index b8bb08e..1e915eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -91,7 +91,7 @@ public interface GridCachePreloader {
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
+    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException;
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 0adf510..68deb2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -142,7 +142,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fdd101e..e177dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,30 +27,25 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
-import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 
 /**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
+ * Thread pool for requesting partitions from other nodes and populating local cache.
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
@@ -63,35 +58,25 @@ public class GridDhtPartitionDemander {
     /** */
     private final ReadWriteLock busyLock;
 
-    /** */
-    @GridToStringInclude
-    private final Collection<DemandWorker> dmdWorkers;
-
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
     /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
     @GridToStringInclude
-    private SyncFuture syncFut;
-
-    /** Preload timeout. */
-    private final AtomicLong timeout;
-
-    /** Allows demand threads to synchronize their step. */
-    private CyclicBarrier barrier;
+    private volatile SyncFuture syncFut;
 
     /** Demand lock. */
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
-    /** */
-    private int poolSize;
-
     /** Last timeout object. */
     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
 
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
+    /** Assignments. */
+    private volatile GridDhtPreloaderAssignments assigns;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
@@ -107,53 +92,47 @@ public class GridDhtPartitionDemander {
 
         boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
         if (enabled) {
-            barrier = new CyclicBarrier(poolSize);
 
-            dmdWorkers = new ArrayList<>(poolSize);
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
 
-            for (int i = 0; i < poolSize; i++)
-                dmdWorkers.add(new DemandWorker(i));
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) {
+                        enterBusy();
 
-            syncFut = new SyncFuture(dmdWorkers);
+                        try {
+                            handleSupplyMessage(idx, id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
         }
-        else {
-            dmdWorkers = Collections.emptyList();
 
-            syncFut = new SyncFuture(dmdWorkers);
+        syncFut = new SyncFuture();
 
+        if (!enabled)
             // Calling onDone() immediately since preloading is disabled.
             syncFut.onDone();
-        }
-
-        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
     }
 
     /**
      *
      */
     void start() {
-        if (poolSize > 0) {
-            for (DemandWorker w : dmdWorkers)
-                new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
-        }
     }
 
     /**
      *
      */
     void stop() {
-        U.cancel(dmdWorkers);
-
-        if (log.isDebugEnabled())
-            log.debug("Before joining on demand workers: " + dmdWorkers);
-
-        U.join(dmdWorkers, log);
-
-        if (log.isDebugEnabled())
-            log.debug("After joining on demand workers: " + dmdWorkers);
+        if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
+                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+        }
 
         lastExchangeFut = null;
 
@@ -177,13 +156,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return poolSize;
-    }
-
-    /**
      * Force preload.
      */
     void forcePreload() {
@@ -225,23 +197,22 @@ public class GridDhtPartitionDemander {
      * @param idx
      * @return topic
      */
-    static Object topic(int idx, int cacheId, UUID nodeId) {
-        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+    static Object topic(int idx, int cacheId) {
+        return TOPIC_CACHE.topic("Demander", cacheId, idx);
     }
 
     /**
-     *
+     * @return {@code True} if topology changed.
      */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
+    private boolean topologyChanged(AffinityTopologyVersion topVer) {
+        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
     }
 
     /**
-     * @param type Type.
-     * @param discoEvt Discovery event.
+     *
      */
-    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-        preloadEvent(-1, type, discoEvt);
+    private void leaveBusy() {
+        busyLock.readLock().unlock();
     }
 
     /**
@@ -256,28 +227,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param p Partition.
      * @param topVer Topology version.
      * @return Picked owners.
@@ -316,7 +265,7 @@ public class GridDhtPartitionDemander {
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -325,341 +274,194 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers)
-                    w.addAssignments(assigns);
-            }
-        }
-        else if (delay > 0) {
-            assert !force;
+            AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-            GridTimeoutObject obj = lastTimeoutObj.get();
+            if (this.assigns != null) {
+                syncFut.get();
 
-            if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
-
-            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-            assert exchFut != null : "Delaying rebalance process without topology event.";
-
-            obj = new GridTimeoutObjectAdapter(delay) {
-                @Override public void onTimeout() {
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forcePreloadExchange(exchFut);
-                        }
-                    });
-                }
-            };
-
-            lastTimeoutObj.set(obj);
+                syncFut = new SyncFuture();
+            }
 
-            cctx.time().addTimeoutObject(obj);
-        }
-    }
+            if (assigns.isEmpty() || topologyChanged(topVer)) {
+                syncFut.onDone();
 
-    /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
+                return;
+            }
 
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
+            this.assigns = assigns;
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionDemander.class, this);
-    }
+            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
+                GridDhtPartitionDemandMessage d = e.getValue();
 
-    /**
-     *
-     */
-    private class DemandWorker extends GridWorker {
-        /** Worker ID. */
-        private int id;
+                d.timeout(cctx.config().getRebalanceTimeout());
+                d.workerId(0);//old api support.
 
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+                ClusterNode node = e.getKey();
 
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+                GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
 
-        /**
-         * @param id Worker ID.
-         */
-        private DemandWorker(int id) {
-            super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemander.this.log);
+                remainings.addAll(d.partitions());
 
-            assert id >= 0;
+                syncFut.append(node.id(), remainings);
 
-            this.id = id;
-        }
+                int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
 
-        /**
-         * @param assigns Assignments.
-         */
-        void addAssignments(GridDhtPreloaderAssignments assigns) {
-            assert assigns != null;
+                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
-            assignQ.offer(assigns);
+                for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                    sParts.add(new HashSet<Integer>());
 
-            if (log.isDebugEnabled())
-                log.debug("Added assignments to worker: " + this);
-        }
+                Iterator<Integer> it = d.partitions().iterator();
 
-        /**
-         * @return {@code True} if topology changed.
-         */
-        private boolean topologyChanged() {
-            return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
-        }
+                int cnt = 0;
 
-        /**
-         * @param pick Node picked for preloading.
-         * @param p Partition.
-         * @param entry Preloaded entry.
-         * @param topVer Topology version.
-         * @return {@code False} if partition has become invalid during preloading.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private boolean preloadEntry(
-            ClusterNode pick,
-            int p,
-            GridCacheEntryInfo entry,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            try {
-                GridCacheEntryEx cached = null;
+                while (it.hasNext())
+                    sParts.get(cnt++ % lsnrCnt).add(it.next());
 
-                try {
-                    cached = cctx.dht().entryEx(entry.key());
+                for (cnt = 0; cnt < lsnrCnt; cnt++) {
 
-                    if (log.isDebugEnabled())
-                        log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+                    if (!sParts.get(cnt).isEmpty()) {
 
-                    if (cctx.dht().isIgfsDataCache() &&
-                        cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
-                        LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
-                            "value, will ignore rebalance entries): " + name());
+                        // Create copy.
+                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                        if (cached.markObsoleteIfEmpty(null))
-                            cached.context().cache().removeIfObsolete(cached.key());
+                        initD.topic(topic(cnt, cctx.cacheId()));
 
-                        return true;
-                    }
-
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start tracking.
-
-                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null);
+                        try {
+                            cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+                        }
+                        catch (IgniteCheckedException ex) {
+                            U.error(log, "Failed to send partition demand message to local node", ex);
                         }
-                        else if (log.isDebugEnabled())
-                            log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
-                                ", part=" + p + ']');
                     }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
                 }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
-                            cached.key() + ", part=" + p + ']');
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
-                    return false;
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
-            }
-
-            return true;
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @return Missed partitions.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private Set<Integer> demandFromNode(
-            final ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            final GridDhtPartitionDemandMessage d,
-            final GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            final GridDhtPartitionTopology top = cctx.dht().topology();
 
-            long timeout = GridDhtPartitionDemander.this.timeout.get();
+                if (log.isInfoEnabled() && !d.partitions().isEmpty()) {
+                    LinkedList<Integer> s = new LinkedList<>(d.partitions());
 
-            d.timeout(timeout);
-            d.workerId(id);
+                    Collections.sort(s);
 
-            final Set<Integer> missed = new HashSet<>();
+                    StringBuilder sb = new StringBuilder();
 
-            final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
+                    int start = -1;
 
-            for (int p : d.partitions())
-                remaining.put(p, false);
+                    int prev = -1;
 
-            if (isCancelled() || topologyChanged())
-                return missed;
+                    Iterator<Integer> sit = s.iterator();
 
-            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+                    while (sit.hasNext()) {
+                        int p = sit.next();
+                        if (start == -1) {
+                            start = p;
+                            prev = p;
+                        }
 
-            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+                        if (prev < p - 1) {
+                            sb.append(start);
 
-            int cnt = 0;
+                            if (start != prev)
+                                sb.append("-").append(prev);
 
-            while (cnt < threadCnt) {
-                sParts.add(new HashSet<Integer>());
+                            sb.append(", ");
 
-                final int idx = cnt;
+                            start = p;
+                        }
 
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
-                        enterBusy();
+                        if (!sit.hasNext()) {
+                            sb.append(start);
 
-                        try {
-                            handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
-                                exchFut, missed, d, remaining);
-                        }finally{
-                            leaveBusy();
+                            if (start != p)
+                                sb.append("-").append(p);
                         }
+
+                        prev = p;
                     }
-                });
 
-                cnt++;
+                    log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]");
+                }
             }
+        }
+        else if (delay > 0) {
+            GridTimeoutObject obj = lastTimeoutObj.get();
 
-            Iterator<Integer> it = d.partitions().iterator();
-
-            cnt = 0;
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
 
-            while (it.hasNext()) {
-                sParts.get(cnt % threadCnt).add(it.next());
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
-                cnt++;
-            }
+            assert exchFut != null : "Delaying rebalance process without topology event.";
 
-            try {
-                cnt = 0;
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
 
-                while (cnt < threadCnt) {
+            lastTimeoutObj.set(obj);
 
-                    // Create copy.
-                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+            cctx.time().addTimeoutObject(obj);
+        }
+    }
 
-                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
+    /**
+     *
+     */
+    void unwindUndeploys() {
+        demandLock.writeLock().lock();
 
-                    try {
-                        if (logg && cctx.name().equals("cache"))
-                        System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
+    }
 
-                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to send partition demand message to local node", e);
-                    }
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @param supply Supply.
+     */
+    private void handleSupplyMessage(
+        int idx,
+        final UUID id,
+        final GridDhtPartitionSupplyMessage supply) {
+        ClusterNode node = cctx.node(id);
 
-                    cnt++;
-                }
+        assert node != null;
 
-                do {
-                    U.sleep(1000);//Todo: improve
-                }
-                while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
+        GridDhtPartitionDemandMessage d = assigns.get(node);
 
-                return missed;
-            }
-            finally {
-                cnt = 0;
+        AffinityTopologyVersion topVer = d.topologyVersion();
 
-                while (cnt < threadCnt) {
-                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+        if (topologyChanged(topVer)) {
+            syncFut.cancel(id);
 
-                    cnt++;
-                }
-            }
+            return;
         }
 
-        boolean logg = false;
-
-        /**
-         * @param s Supply message.
-         * @param node Node.
-         * @param topVer Topology version.
-         * @param top Topology.
-         * @param exchFut Exchange future.
-         * @param missed Missed.
-         * @param d initial DemandMessage.
-         */
-        private void handleSupplyMessage(
-            int idx,
-            SupplyMessage s,
-            ClusterNode node,
-            AffinityTopologyVersion topVer,
-            GridDhtPartitionTopology top,
-            GridDhtPartitionsExchangeFuture exchFut,
-            Set<Integer> missed,
-            GridDhtPartitionDemandMessage d,
-            ConcurrentHashMap8 remaining) {
-
-            if (logg && cctx.name().equals("cache"))
-            System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
-
-            // Check that message was received from expected node.
-            if (!s.senderId().equals(node.id())) {
-                U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                    ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+        if (log.isDebugEnabled())
+            log.debug("Received supply message: " + supply);
 
-                return;
-            }
+        // Check whether there were class loading errors on unmarshal
+        if (supply.classError() != null) {
+            if (log.isDebugEnabled())
+                log.debug("Class got undeployed during preloading: " + supply.classError());
 
-            if (topologyChanged())
-                return;
+            syncFut.cancel(id);
 
-            if (log.isDebugEnabled())
-                log.debug("Received supply message: " + s);
+            return;
+        }
 
-            GridDhtPartitionSupplyMessage supply = s.supply();
+        final GridDhtPartitionTopology top = cctx.dht().topology();
 
-            // Check whether there were class loading errors on unmarshal
-            if (supply.classError() != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Class got undeployed during preloading: " + supply.classError());
+        GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture();
 
-                return;
-            }
+        try {
 
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
@@ -689,19 +491,12 @@ public class GridDhtPartitionDemander {
 
                                     continue;
                                 }
-                                try {
-                                    if (!preloadEntry(node, p, entry, topVer)) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Got entries for invalid partition during " +
-                                                "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
-                                        break;
-                                    }
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    cancel();
+                                if (!preloadEntry(node, p, entry, topVer)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entries for invalid partition during " +
+                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
 
-                                    return;
+                                    break;
                                 }
                             }
 
@@ -710,12 +505,9 @@ public class GridDhtPartitionDemander {
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
-                                top.own(part);//todo: close future?
-
-//                                if (logg && cctx.name().equals("cache"))
-//                                    System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+                                top.own(part);
 
-                                remaining.remove(p);
+                                syncFut.onPartitionDone(id, p);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
@@ -731,218 +523,139 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        remaining.remove(p);
+                        syncFut.onPartitionDone(id, p);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                     }
                 }
                 else {
-                    remaining.remove(p);
+                    syncFut.onPartitionDone(id, p);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
                 }
             }
 
-            for (Integer miss : s.supply().missed())
-                remaining.remove(miss);
-
             // Only request partitions based on latest topology version.
-            for (Integer miss : s.supply().missed())
+            for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    missed.add(miss);
+                    syncFut.onMissedPartition(id, miss);
 
-            if (!remaining.isEmpty()) {
-                try {
-                    // Create copy.
-                    GridDhtPartitionDemandMessage nextD =
-                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+            for (Integer miss : supply.missed())
+                syncFut.onPartitionDone(id, miss);
 
-                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
+            if (!syncFut.isDone()) {
 
-                    // Send demand message.
-                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
-                        nextD, cctx.ioPolicy(), d.timeout());
+                // Create copy.
+                GridDhtPartitionDemandMessage nextD =
+                    new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                    if (logg && cctx.name().equals("cache"))
-                        System.out.println("D " + idx + " ack  " + cctx.localNode().id());
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+                nextD.topic(topic(idx, cctx.cacheId()));
 
-                    cancel();
-                }
+                // Send demand message.
+                cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+                    nextD, cctx.ioPolicy(), d.timeout());
             }
         }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                    ", msg=" + e.getMessage() + ']');
+            syncFut.cancel(id);
+        }
+        catch (IgniteCheckedException ex) {
+            U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                if (!CU.isMarshallerCache(cctx.name())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+            syncFut.cancel(id);
+        }
+    }
 
-                    try {
-                        cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ']');
+    /**
+     * @param pick Node picked for preloading.
+     * @param p Partition.
+     * @param entry Preloaded entry.
+     * @param topVer Topology version.
+     * @return {@code False} if partition has become invalid during preloading.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private boolean preloadEntry(
+        ClusterNode pick,
+        int p,
+        GridCacheEntryInfo entry,
+        AffinityTopologyVersion topVer
+    ) throws IgniteCheckedException {
+        try {
+            GridCacheEntryEx cached = null;
 
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
-                    }
-                }
+            try {
+                cached = cctx.dht().entryEx(entry.key());
 
-                if (rebalanceOrder > 0) {
-                    IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
 
-                    try {
-                        if (fut != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
-                                    ", rebalanceOrder=" + rebalanceOrder + ']');
+                if (cctx.dht().isIgfsDataCache() &&
+                    cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                    LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                        "value, will ignore rebalance entries)");
 
-                            fut.get();
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+                    if (cached.markObsoleteIfEmpty(null))
+                        cached.context().cache().removeIfObsolete(cached.key());
 
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
-                    }
+                    return true;
                 }
 
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                boolean stopEvtFired = false;
-
-                while (!isCancelled()) {
-                    try {
-                        barrier.await();
-
-                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
-                            cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
-                            if (!cctx.isReplicated() || !stopEvtFired) {
-                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                                stopEvtFired = true;
-                            }
-                        }
-                    }
-                    catch (BrokenBarrierException ignore) {
-                        throw new InterruptedException("Demand worker stopped.");
-                    }
-
-                    // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments assigns = null;
-
-                    while (assigns == null)
-                        assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
-                    demandLock.readLock().lock();
-
-                    try {
-                        exchFut = assigns.exchangeFuture();
-
-                        // Assignments are empty if preloading is disabled.
-                        if (assigns.isEmpty())
-                            continue;
-
-                        boolean resync = false;
-
-                        // While.
-                        // =====
-                        while (!isCancelled() && !topologyChanged() && !resync) {
-                            Collection<Integer> missed = new HashSet<>();
-
-                            // For.
-                            // ===
-                            for (ClusterNode node : assigns.keySet()) {
-                                if (topologyChanged() || isCancelled())
-                                    break; // For.
-
-                                GridDhtPartitionDemandMessage d = assigns.remove(node);
-
-                                // If another thread is already processing this message,
-                                // move to the next node.
-                                if (d == null)
-                                    continue; // For.
-
-                                try {
-                                    Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
-                                    if (!set.isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
-                                                set + ']');
-
-                                        missed.addAll(set);
-                                    }
-                                }
-                                catch (IgniteInterruptedCheckedException e) {
-                                    throw e;
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
-                                            ", msg=" + e.getMessage() + ']');
-
-                                    resync = true;
-
-                                    break; // For.
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
-                                }
-                            }
-
-                            // Processed missed entries.
-                            if (!missed.isEmpty()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reassigning partitions that were missed: " + missed);
-
-                                assert exchFut.exchangeId() != null;
-
-                                cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                            }
-                            else
-                                break; // While.
-                        }
-                    }
-                    finally {
-                        demandLock.readLock().unlock();
-
-                        syncFut.onWorkerDone(this);
+                if (preloadPred == null || preloadPred.apply(entry)) {
+                    if (cached.initialValue(
+                        entry.value(),
+                        entry.version(),
+                        entry.ttl(),
+                        entry.expireTime(),
+                        true,
+                        topVer,
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                    )) {
+                        cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                            cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                false, null, null, null);
                     }
-
-                    cctx.shared().exchange().scheduleResendPartitions();
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                            ", part=" + p + ']');
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
             }
-            finally {
-                // Safety.
-                syncFut.onWorkerDone(this);
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                        cached.key() + ", part=" + p + ']');
             }
-        }
+            catch (GridDhtInvalidPartitionException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Partition became invalid during rebalancing (will ignore): " + p);
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
+                return false;
+            }
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw e;
         }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
     }
 
     /**
@@ -1035,88 +748,80 @@ public class GridDhtPartitionDemander {
         return assigns;
     }
 
-    /**
-     *
-     */
-    private class SyncFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
+/**
+ *
+ */
+private class SyncFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 1L;
 
-        /** Remaining workers. */
-        private Collection<DemandWorker> remaining;
+    private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>();
 
-        /**
-         * @param workers List of workers.
-         */
-        private SyncFuture(Collection<DemandWorker> workers) {
-            assert workers.size() == poolSize();
+    private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
 
-            remaining = Collections.synchronizedList(new LinkedList<>(workers));
-        }
+    public void append(UUID nodeId, Collection<Integer> parts) {
+        remaining.put(nodeId, parts);
 
-        /**
-         * @param w Worker who iterated through all partitions.
-         */
-        void onWorkerDone(DemandWorker w) {
-            if (isDone())
-                return;
+        missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+    }
 
-            if (remaining.remove(w))
-                if (log.isDebugEnabled())
-                    log.debug("Completed full partition iteration for worker [worker=" + w + ']');
+    void cancel(UUID nodeId) {
+        if (isDone())
+            return;
 
-            if (remaining.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed sync future.");
+        remaining.remove(nodeId);
 
-                onDone();
-            }
-        }
+        checkIsDone();
     }
 
-    /**
-     * Supply message wrapper.
-     */
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
+    void onMissedPartition(UUID nodeId, int p) {
+        if (missed.get(nodeId) == null)
+            missed.put(nodeId, new GridConcurrentHashSet<Integer>());
 
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
+        missed.get(nodeId).add(p);
+   }
 
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
+    void onPartitionDone(UUID nodeId, int p) {
+        if (isDone())
+            return;
+
+        Collection<Integer> parts = remaining.get(nodeId);
+
+        parts.remove(p);
 
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
+        if (parts.isEmpty()) {
+            remaining.remove(nodeId);
+
+            if (log.isDebugEnabled())
+                log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
         }
 
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
+        checkIsDone();
+    }
+
+    private void checkIsDone() {
+        if (remaining.isEmpty()) {
+            if (log.isDebugEnabled())
+                log.debug("Completed sync future.");
+
+            Collection<Integer> m = new HashSet<>();
+
+            for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
+                if (e.getValue() != null && !e.getValue().isEmpty())
+                    m.addAll(e.getValue());
+            }
+
+            if (!m.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Reassigning partitions that were missed: " + m);
+
+                cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
+            }
+
+            missed.clear();
+
+            onDone();
         }
     }
 }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 920d10d..b948fbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -108,7 +108,7 @@ class GridDhtPartitionSupplier {
      * @return topic
      */
     static Object topic(int idx, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", idx, id);
+        return TOPIC_CACHE.topic("Supplier", idx, id);
     }
 
     /**
@@ -138,8 +138,6 @@ class GridDhtPartitionSupplier {
         this.preloadPred = preloadPred;
     }
 
-    boolean logg = false;
-
     /**
      * @return {@code true} if entered to busy state.
      */
@@ -172,9 +170,6 @@ class GridDhtPartitionSupplier {
         if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S " + idx + " process message " + cctx.localNode().id());
-
         GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
             d.updateSequence(), cctx.cacheId());
 
@@ -191,12 +186,8 @@ class GridDhtPartitionSupplier {
                 doneMap.remove(scId);
             }
 
-            if (doneMap.get(scId) != null) {
-                if (logg && cctx.name().equals("cache"))
-                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
-
+            if (doneMap.get(scId) != null)
                 return;
-            }
 
             long bCnt = 0;
 
@@ -282,9 +273,6 @@ class GridDhtPartitionSupplier {
                                     return;
                                 }
                                 else {
-                                    if (logg && cctx.name().equals("cache"))
-                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
-
                                     s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
                                 }
@@ -473,9 +461,6 @@ class GridDhtPartitionSupplier {
                     // Mark as last supply message.
                     s.last(part);
 
-//                    if (logg && cctx.name().equals("cache"))
-//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
-
                     phase = 0;
 
                     sctx = null;
@@ -508,8 +493,6 @@ class GridDhtPartitionSupplier {
      */
     private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
         throws IgniteCheckedException {
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S sent "+ cctx.localNode().id());
 
         try {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a22f281..8a097ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -254,7 +254,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
         demander.addAssignments(assignments, forcePreload);
     }
 


[2/8] incubator-ignite git commit: ignite-1093 Code cleanup

Posted by av...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..920d10d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final ReadWriteLock busyLock;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. */
+    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+    /** Done map. */
+    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+    /**
+     * @param cctx Cache context.
+     * @param busyLock Shutdown lock.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+        assert cctx != null;
+        assert busyLock != null;
+
+        this.cctx = cctx;
+        this.busyLock = busyLock;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            processMessage(m, id, idx);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @return topic
+     */
+    static Object topic(int idx, int id) {
+        return TOPIC_CACHE.topic("SupplyPool", idx, id);
+    }
+
+    /**
+     *
+     */
+    void start() {
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        top = null;
+
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
+                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+        }
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    boolean logg = false;
+
+    /**
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        if (busyLock.readLock().tryLock())
+            return true;
+
+        if (log.isDebugEnabled())
+            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
+
+        return false;
+    }
+
+    /**
+     *
+     */
+    private void leaveBusy() {
+        busyLock.readLock().unlock();
+    }
+
+    /**
+     * @param d Demand message.
+     * @param id Node uuid.
+     * @param idx Index.
+     */
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
+        assert d != null;
+        assert id != null;
+
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S " + idx + " process message " + cctx.localNode().id());
+
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+        try {
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+                doneMap.remove(scId);
+            }
+
+            if (doneMap.get(scId) != null) {
+                if (logg && cctx.name().equals("cache"))
+                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
+
+                return;
+            }
+
+            long bCnt = 0;
+
+            int phase = 0;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = 3;//Todo: param
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == 0)
+                        phase = 1;
+
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+                                    swapLsnr = null;
+
+                                    return;
+                                }
+                                else {
+                                    if (logg && cctx.name().equals("cache"))
+                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
+
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == 1)
+                        phase = 2;
+
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                while (iter.hasNext()) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        if (!reply(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        if (++bCnt >= maxBatchesCnt) {
+                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+                                            swapLsnr = null;
+
+                                            return;
+                                        }
+                                        else {
+                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId());
+                                        }
+                                    }
+
+                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == 2)
+                        phase = 3;
+
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+//                    if (logg && cctx.name().equals("cache"))
+//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
+
+                    phase = 0;
+
+                    sctx = null;
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            reply(node, d, s);
+
+            doneMap.put(scId, true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S sent "+ cctx.localNode().id());
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2 t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext{
+        /** Phase. */
+        private int phase;
+
+        /** Partition iterator. */
+        private Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        int part;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index c1c9941..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jsr166.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private GridDhtPartitionTopology top;
-
-    /** */
-    private final boolean depEnabled;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /** Supply context map. */
-    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
-
-    /** Done map. */
-    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-
-        log = cctx.logger(getClass());
-
-        top = cctx.dht().topology();
-
-        int cnt = 0;
-
-        if (!cctx.kernalContext().clientNode()) {
-            while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
-                final int idx = cnt;
-
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                        processMessage(m, id, idx);
-                    }
-                });
-
-                cnt++;
-            }
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     * @param idx Index.
-     * @param id Node id.
-     * @return topic
-     */
-    static Object topic(int idx, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", idx, id);
-    }
-
-    /**
-     *
-     */
-    void start() {
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        top = null;
-    }
-
-    /**
-     * Sets preload predicate for supply pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    boolean logg = false;
-
-    /**
-     * @param d Demand message.
-     * @param id Node uuid.
-     */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
-        assert d != null;
-        assert id != null;
-
-        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
-            return;
-
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S " + idx + " process message " + cctx.localNode().id());
-
-        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-            d.updateSequence(), cctx.cacheId());
-
-        long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-        ClusterNode node = cctx.discovery().node(id);
-
-        T2<UUID, Object> scId = new T2<>(id, d.topic());
-
-        try {
-            SupplyContext sctx = scMap.remove(scId);
-
-            if (!d.partitions().isEmpty()) {//Only first request contains partitions.
-                doneMap.remove(scId);
-            }
-
-            if (doneMap.get(scId) != null) {
-                if (logg && cctx.name().equals("cache"))
-                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
-
-                return;
-            }
-
-            long bCnt = 0;
-
-            int phase = 0;
-
-            boolean newReq = true;
-
-            long maxBatchesCnt = 3;//Todo: param
-
-            if (sctx != null) {
-                phase = sctx.phase;
-
-                maxBatchesCnt = 1;
-            }
-
-            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
-
-            while ((sctx != null && newReq) || partIt.hasNext()) {
-                int part = sctx != null && newReq ? sctx.part : partIt.next();
-
-                newReq = false;
-
-                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                    // Reply with partition of "-1" to let sender know that
-                    // this node is no longer an owner.
-                    s.missed(part);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Requested partition is not owned by local node [part=" + part +
-                            ", demander=" + id + ']');
-
-                    continue;
-                }
-
-                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                try {
-                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
-                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                        cctx.swap().addOffHeapListener(part, swapLsnr);
-                        cctx.swap().addSwapListener(part, swapLsnr);
-                    }
-
-                    boolean partMissing = false;
-
-                    if (phase == 0)
-                        phase = 1;
-
-                    if (phase == 1) {
-                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
-                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
-
-                        while (entIt.hasNext()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition, so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + id + ']');
-
-                                partMissing = true;
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
-
-                                    swapLsnr = null;
-
-                                    return;
-                                }
-                                else {
-                                    if (logg && cctx.name().equals("cache"))
-                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
-
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-                            }
-
-                            GridCacheEntryEx e = entIt.next();
-
-                            GridCacheEntryInfo info = e.info();
-
-                            if (info != null && !info.isNew()) {
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        if (partMissing)
-                            continue;
-
-                    }
-
-                    if (phase == 1)
-                        phase = 2;
-
-                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
-                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
-                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
-                            cctx.swap().iterator(part);
-
-                        // Iterator may be null if space does not exist.
-                        if (iter != null) {
-                            try {
-                                boolean prepared = false;
-
-                                while (iter.hasNext()) {
-                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                        // Demander no longer needs this partition,
-                                        // so we send '-1' partition and move on.
-                                        s.missed(part);
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Demanding node does not need requested partition " +
-                                                "[part=" + part + ", nodeId=" + id + ']');
-
-                                        partMissing = true;
-
-                                        break; // For.
-                                    }
-
-                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                        if (!reply(node, d, s))
-                                            return;
-
-                                        // Throttle preloading.
-                                        if (preloadThrottle > 0)
-                                            U.sleep(preloadThrottle);
-
-                                        if (++bCnt >= maxBatchesCnt) {
-                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
-
-                                            swapLsnr = null;
-
-                                            return;
-                                        }
-                                        else {
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                                cctx.cacheId());
-                                        }
-                                    }
-
-                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
-
-                                    GridCacheSwapEntry swapEntry = e.getValue();
-
-                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                    info.keyBytes(e.getKey());
-                                    info.ttl(swapEntry.ttl());
-                                    info.expireTime(swapEntry.expireTime());
-                                    info.version(swapEntry.version());
-                                    info.value(swapEntry.value());
-
-                                    if (preloadPred == null || preloadPred.apply(info))
-                                        s.addEntry0(part, info, cctx);
-                                    else {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                "cache entry): " + info);
-
-                                        continue;
-                                    }
-
-                                    // Need to manually prepare cache message.
-                                    if (depEnabled && !prepared) {
-                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                            swapEntry.valueClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                null;
-
-                                        if (ldr == null)
-                                            continue;
-
-                                        if (ldr instanceof GridDeploymentInfo) {
-                                            s.prepare((GridDeploymentInfo)ldr);
-
-                                            prepared = true;
-                                        }
-                                    }
-                                }
-
-                                if (partMissing)
-                                    continue;
-                            }
-                            finally {
-                                iter.close();
-                            }
-                        }
-                    }
-
-                    if (swapLsnr == null && sctx != null)
-                        swapLsnr = sctx.swapLsnr;
-
-                    // Stop receiving promote notifications.
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-
-                    if (phase == 2)
-                        phase = 3;
-
-                    if (phase == 3 && swapLsnr != null) {
-                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                        swapLsnr = null;
-
-                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
-                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
-
-                        while (lsnrIt.hasNext()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition,
-                                // so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition " +
-                                        "[part=" + part + ", nodeId=" + id + ']');
-
-                                // No need to continue iteration over swap entries.
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
-
-                                    return;
-                                }
-                                else {
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-                            }
-
-                            GridCacheEntryInfo info = lsnrIt.next();
-
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry(part, info, cctx);
-                            else if (log.isDebugEnabled())
-                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                    info);
-                        }
-                    }
-
-                    // Mark as last supply message.
-                    s.last(part);
-
-//                    if (logg && cctx.name().equals("cache"))
-//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
-
-                    phase = 0;
-
-                    sctx = null;
-                }
-                finally {
-                    loc.release();
-
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-                }
-            }
-
-            reply(node, d, s);
-
-            doneMap.put(scId, true);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send partition supply message to node: " + id, e);
-        }
-    }
-
-    /**
-     * @param n Node.
-     * @param s Supply message.
-     * @return {@code True} if message was sent, {@code false} if recipient left grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-        throws IgniteCheckedException {
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S sent "+ cctx.localNode().id());
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-            return true;
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-            return false;
-        }
-    }
-
-
-    /**
-     * Demand message wrapper.
-     */
-    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param sndId Sender ID.
-         * @param msg Message.
-         */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
-            super(sndId, msg);
-        }
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public DemandMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return get1();
-        }
-
-        /**
-         * @return Message.
-         */
-        public GridDhtPartitionDemandMessage message() {
-            return get2();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
-        }
-    }
-
-
-    /**
-     * @param t T.
-     * @param phase Phase.
-     * @param partIt Partition it.
-     * @param entryIt Entry it.
-     * @param swapLsnr Swap listener.
-     */
-    private void saveSupplyContext(
-        T2 t,
-        int phase,
-        Iterator<Integer> partIt,
-        int part,
-        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
-        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
-    }
-
-    private static class SupplyContext{
-        private int phase;
-
-        private Iterator<Integer> partIt;
-
-        private Iterator<?> entryIt;
-
-        private GridCacheEntryInfoCollectSwapListener swapLsnr;
-
-        int part;
-
-        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
-            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
-            this.phase = phase;
-            this.partIt = partIt;
-            this.entryIt = entryIt;
-            this.swapLsnr = swapLsnr;
-            this.part = part;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index fbcbc37..a22f281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool supplyPool;
+    private GridDhtPartitionSupplier supplier;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool demandPool;
+    private GridDhtPartitionDemander demander;
 
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
@@ -159,8 +159,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx, busyLock);
+        demander = new GridDhtPartitionDemander(cctx, busyLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -180,18 +180,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         topVer.setIfGreater(startTopVer);
 
-        supplyPool.start();
-        demandPool.start();
+        supplier.start();
+        demander.start();
     }
 
     /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         super.preloadPredicate(preloadPred);
 
-        assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+        assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
 
-        supplyPool.preloadPredicate(preloadPred);
-        demandPool.preloadPredicate(preloadPred);
+        supplier.preloadPredicate(preloadPred);
+        demander.preloadPredicate(preloadPred);
     }
 
     /** {@inheritDoc} */
@@ -205,11 +205,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
-        if (supplyPool != null)
-            supplyPool.stop();
+        if (supplier != null)
+            supplier.stop();
 
-        if (demandPool != null)
-            demandPool.stop();
+        if (demander != null)
+            demander.stop();
 
         top = null;
     }
@@ -226,7 +226,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
                 U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
 
-                demandPool.syncFuture().listen(new CI1<Object>() {
+                demander.syncFuture().listen(new CI1<Object>() {
                     @Override public void apply(Object t) {
                         U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
                             "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
@@ -245,17 +245,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demandPool.updateLastExchangeFuture(lastFut);
+        demander.updateLastExchangeFuture(lastFut);
     }
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demandPool.assign(exchFut);
+        return demander.assign(exchFut);
     }
 
     /** {@inheritDoc} */
     @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
-        demandPool.addAssignments(assignments, forcePreload);
+        demander.addAssignments(assignments, forcePreload);
     }
 
     /**
@@ -267,7 +267,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
     }
 
     /**
@@ -526,12 +526,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void forcePreload() {
-        demandPool.forcePreload();
+        demander.forcePreload();
     }
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demandPool.unwindUndeploys();
+        demander.unwindUndeploys();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 4992d19..5148753 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 10_024_000;
+    private static int TEST_SIZE = 1_024_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";


[4/8] incubator-ignite git commit: ignite-1093 Code cleanup

Posted by av...@apache.org.
ignite-1093 Code cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76ba5d95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76ba5d95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76ba5d95

Branch: refs/heads/ignite-1093
Commit: 76ba5d953133448b2753c0727098e7552ff79753
Parents: 9fbb559
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 10 12:46:26 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 10 12:46:26 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 21 ++++++++------------
 1 file changed, 8 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76ba5d95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 711b69b..fdd101e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -54,9 +54,6 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
-    /** Dummy message to wake up a blocking queue if a node leaves. */
-    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
     /** */
     private final GridCacheContext<?, ?> cctx;
 
@@ -259,14 +256,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param msg Message to check.
-     * @return {@code True} if dummy message.
-     */
-    private boolean dummyTopology(SupplyMessage msg) {
-        return msg == DUMMY_TOP;
-    }
-
-    /**
      * @param deque Deque to poll from.
      * @param time Time to wait.
      * @param w Worker.
@@ -558,8 +547,14 @@ public class GridDhtPartitionDemander {
 
                 cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
                     @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
-                        handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
-                            exchFut, missed, d, remaining);
+                        enterBusy();
+
+                        try {
+                            handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+                                exchFut, missed, d, remaining);
+                        }finally{
+                            leaveBusy();
+                        }
                     }
                 });
 


[7/8] incubator-ignite git commit: ignite-1093

Posted by av...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6a733ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6a733ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6a733ef9

Branch: refs/heads/ignite-1093
Commit: 6a733ef9165096f4c75e71874938cfc83a6998f8
Parents: db72f53
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 19:15:03 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 19:15:03 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 44 ++++++++++++++++++++
 1 file changed, 44 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6a733ef9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index fca9f53..30a04c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -120,6 +120,50 @@ public class GridDhtPartitionDemander {
      *
      */
     void start() {
+        int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+        if (!CU.isMarshallerCache(cctx.name())) {
+            if (log.isDebugEnabled())
+                log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+            try {
+                cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
+            }
+            catch (IgniteInterruptedCheckedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+                        "[cacheName=" + cctx.name() + ']');
+
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+            }
+        }
+
+        if (rebalanceOrder > 0) {
+            IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+            try {
+                if (fut != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+                            ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                    fut.get();
+                }
+            }
+            catch (IgniteInterruptedCheckedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+                        "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+            }
+        }
     }
 
     /**


[6/8] incubator-ignite git commit: ignite-1093

Posted by av...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db72f531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db72f531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db72f531

Branch: refs/heads/ignite-1093
Commit: db72f531342231cf45f49e395056c12cce3a79e5
Parents: d0b7d9f
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 18:58:05 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 18:58:05 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 135 +------------------
 .../dht/preloader/GridDhtPreloader.java         | 123 ++++++++++++++++-
 2 files changed, 122 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e177dae..fca9f53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -65,9 +65,6 @@ public class GridDhtPartitionDemander {
     @GridToStringInclude
     private volatile SyncFuture syncFut;
 
-    /** Demand lock. */
-    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
     /** Last timeout object. */
     private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
 
@@ -227,44 +224,10 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Picked owners.
-     */
-    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        int affCnt = affNodes.size();
-
-        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
-        int rmtCnt = rmts.size();
-
-        if (rmtCnt <= affCnt)
-            return rmts;
-
-        List<ClusterNode> sorted = new ArrayList<>(rmts);
-
-        // Sort in descending order, so nodes with higher order will be first.
-        Collections.sort(sorted, CU.nodeComparator(false));
-
-        // Pick newest nodes.
-        return sorted.subList(0, affCnt);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Nodes owning this partition.
-     */
-    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
-    }
-
-    /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
+
     void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
@@ -408,20 +371,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
-
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
-
-    /**
      * @param idx Index.
      * @param id Node id.
      * @param supply Supply.
@@ -666,88 +615,6 @@ public class GridDhtPartitionDemander {
     void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
         lastExchangeFut = lastFut;
     }
-
-    /**
-     * @param exchFut Exchange future.
-     * @return Assignments of partitions to nodes.
-     */
-    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
-
-        if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        int partCnt = cctx.affinity().partitions();
-
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
-
-        for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
-                        exchFut.exchangeId());
-
-                break;
-            }
-
-            // If partition belongs to local node.
-            if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                assert part != null;
-                assert part.id() == p;
-
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
-                    continue; // For.
-                }
-
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
-                if (picked.isEmpty()) {
-                    top.own(part);
-
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.first(picked);
-
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
-
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
-                    }
-
-                    msg.addPartition(p);
-                }
-            }
-        }
-
-        return assigns;
-    }
-
 /**
  *
  */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8a097ed..d994a19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
 
 /**
@@ -76,6 +77,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -250,7 +254,115 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demander.assign(exchFut);
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        int partCnt = cctx.affinity().partitions();
+
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
+
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
+
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                assert part != null;
+                assert part.id() == p;
+
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.first(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
+            }
+        }
+
+        return assigns;
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
     }
 
     /** {@inheritDoc} */
@@ -531,7 +643,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demander.unwindUndeploys();
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**


[3/8] incubator-ignite git commit: ignite-1093 Code cleanup

Posted by av...@apache.org.
ignite-1093 Code cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9fbb5597
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9fbb5597
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9fbb5597

Branch: refs/heads/ignite-1093
Commit: 9fbb5597293a6805dad8fb10dec24b5628bb201f
Parents: 4776fec
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Mon Aug 10 12:08:20 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Mon Aug 10 12:08:20 2015 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionDemandPool.java   | 1127 ------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 1127 ++++++++++++++++++
 .../dht/preloader/GridDhtPartitionSupplier.java |  582 +++++++++
 .../preloader/GridDhtPartitionSupplyPool.java   |  576 ---------
 .../dht/preloader/GridDhtPreloader.java         |   40 +-
 .../GridCacheMassiveRebalancingSelfTest.java    |    2 +-
 6 files changed, 1730 insertions(+), 1724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
deleted file mode 100644
index 11645e9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ /dev/null
@@ -1,1127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-import static org.apache.ignite.internal.processors.dr.GridDrType.*;
-
-/**
- * Thread pool for requesting partitions from other nodes
- * and populating local cache.
- */
-@SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool {
-    /** Dummy message to wake up a blocking queue if a node leaves. */
-    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
-
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
-    @GridToStringInclude
-    private final Collection<DemandWorker> dmdWorkers;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
-    @GridToStringInclude
-    private SyncFuture syncFut;
-
-    /** Preload timeout. */
-    private final AtomicLong timeout;
-
-    /** Allows demand threads to synchronize their step. */
-    private CyclicBarrier barrier;
-
-    /** Demand lock. */
-    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
-
-    /** */
-    private int poolSize;
-
-    /** Last timeout object. */
-    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
-
-    /** Last exchange future. */
-    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-        this.busyLock = busyLock;
-
-        log = cctx.logger(getClass());
-
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
-        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-        if (enabled) {
-            barrier = new CyclicBarrier(poolSize);
-
-            dmdWorkers = new ArrayList<>(poolSize);
-
-            for (int i = 0; i < poolSize; i++)
-                dmdWorkers.add(new DemandWorker(i));
-
-            syncFut = new SyncFuture(dmdWorkers);
-        }
-        else {
-            dmdWorkers = Collections.emptyList();
-
-            syncFut = new SyncFuture(dmdWorkers);
-
-            // Calling onDone() immediately since preloading is disabled.
-            syncFut.onDone();
-        }
-
-        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
-    }
-
-    /**
-     *
-     */
-    void start() {
-        if (poolSize > 0) {
-            for (DemandWorker w : dmdWorkers)
-                new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
-        }
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        U.cancel(dmdWorkers);
-
-        if (log.isDebugEnabled())
-            log.debug("Before joining on demand workers: " + dmdWorkers);
-
-        U.join(dmdWorkers, log);
-
-        if (log.isDebugEnabled())
-            log.debug("After joining on demand workers: " + dmdWorkers);
-
-        lastExchangeFut = null;
-
-        lastTimeoutObj.set(null);
-    }
-
-    /**
-     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
-     */
-    IgniteInternalFuture<?> syncFuture() {
-        return syncFut;
-    }
-
-    /**
-     * Sets preload predicate for demand pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    /**
-     * @return Size of this thread pool.
-     */
-    int poolSize() {
-        return poolSize;
-    }
-
-    /**
-     * Force preload.
-     */
-    void forcePreload() {
-        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
-
-        if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
-
-        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-        if (exchFut != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing rebalance event for future: " + exchFut);
-
-            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    cctx.shared().exchange().forcePreloadExchange(exchFut);
-                }
-            });
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Ignoring force rebalance request (no topology event happened yet).");
-    }
-
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     * @param idx
-     * @return topic
-     */
-    static Object topic(int idx, int cacheId, UUID nodeId) {
-        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-        preloadEvent(-1, type, discoEvt);
-    }
-
-    /**
-     * @param part Partition.
-     * @param type Type.
-     * @param discoEvt Discovery event.
-     */
-    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
-        assert discoEvt != null;
-
-        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
-    }
-
-    /**
-     * @param msg Message to check.
-     * @return {@code True} if dummy message.
-     */
-    private boolean dummyTopology(SupplyMessage msg) {
-        return msg == DUMMY_TOP;
-    }
-
-    /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Picked owners.
-     */
-    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
-
-        int affCnt = affNodes.size();
-
-        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
-
-        int rmtCnt = rmts.size();
-
-        if (rmtCnt <= affCnt)
-            return rmts;
-
-        List<ClusterNode> sorted = new ArrayList<>(rmts);
-
-        // Sort in descending order, so nodes with higher order will be first.
-        Collections.sort(sorted, CU.nodeComparator(false));
-
-        // Pick newest nodes.
-        return sorted.subList(0, affCnt);
-    }
-
-    /**
-     * @param p Partition.
-     * @param topVer Topology version.
-     * @return Nodes owning this partition.
-     */
-    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
-        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
-    }
-
-    /**
-     * @param assigns Assignments.
-     * @param force {@code True} if dummy reassign.
-     */
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
-        if (log.isDebugEnabled())
-            log.debug("Adding partition assignments: " + assigns);
-
-        long delay = cctx.config().getRebalanceDelay();
-
-        if (delay == 0 || force) {
-            assert assigns != null;
-
-            synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers)
-                    w.addAssignments(assigns);
-            }
-        }
-        else if (delay > 0) {
-            assert !force;
-
-            GridTimeoutObject obj = lastTimeoutObj.get();
-
-            if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
-
-            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
-
-            assert exchFut != null : "Delaying rebalance process without topology event.";
-
-            obj = new GridTimeoutObjectAdapter(delay) {
-                @Override public void onTimeout() {
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forcePreloadExchange(exchFut);
-                        }
-                    });
-                }
-            };
-
-            lastTimeoutObj.set(obj);
-
-            cctx.time().addTimeoutObject(obj);
-        }
-    }
-
-    /**
-     *
-     */
-    void unwindUndeploys() {
-        demandLock.writeLock().lock();
-
-        try {
-            cctx.deploy().unwind(cctx);
-        }
-        finally {
-            demandLock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandPool.class, this);
-    }
-
-    /**
-     *
-     */
-    private class DemandWorker extends GridWorker {
-        /** Worker ID. */
-        private int id;
-
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
-
-        /**
-         * @param id Worker ID.
-         */
-        private DemandWorker(int id) {
-            super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log);
-
-            assert id >= 0;
-
-            this.id = id;
-        }
-
-        /**
-         * @param assigns Assignments.
-         */
-        void addAssignments(GridDhtPreloaderAssignments assigns) {
-            assert assigns != null;
-
-            assignQ.offer(assigns);
-
-            if (log.isDebugEnabled())
-                log.debug("Added assignments to worker: " + this);
-        }
-
-        /**
-         * @return {@code True} if topology changed.
-         */
-        private boolean topologyChanged() {
-            return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
-        }
-
-        /**
-         * @param pick Node picked for preloading.
-         * @param p Partition.
-         * @param entry Preloaded entry.
-         * @param topVer Topology version.
-         * @return {@code False} if partition has become invalid during preloading.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private boolean preloadEntry(
-            ClusterNode pick,
-            int p,
-            GridCacheEntryInfo entry,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            try {
-                GridCacheEntryEx cached = null;
-
-                try {
-                    cached = cctx.dht().entryEx(entry.key());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
-
-                    if (cctx.dht().isIgfsDataCache() &&
-                        cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
-                        LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
-                            "value, will ignore rebalance entries): " + name());
-
-                        if (cached.markObsoleteIfEmpty(null))
-                            cached.context().cache().removeIfObsolete(cached.key());
-
-                        return true;
-                    }
-
-                    if (preloadPred == null || preloadPred.apply(entry)) {
-                        if (cached.initialValue(
-                            entry.value(),
-                            entry.version(),
-                            entry.ttl(),
-                            entry.expireTime(),
-                            true,
-                            topVer,
-                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
-                        )) {
-                            cctx.evicts().touch(cached, topVer); // Start tracking.
-
-                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
-                                cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
-                                    (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
-                                    false, null, null, null);
-                        }
-                        else if (log.isDebugEnabled())
-                            log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
-                                ", part=" + p + ']');
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
-                            cached.key() + ", part=" + p + ']');
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
-
-                    return false;
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
-            }
-
-            return true;
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @return Missed partitions.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private Set<Integer> demandFromNode(
-            final ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            final GridDhtPartitionDemandMessage d,
-            final GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            final GridDhtPartitionTopology top = cctx.dht().topology();
-
-            long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
-            d.timeout(timeout);
-            d.workerId(id);
-
-            final Set<Integer> missed = new HashSet<>();
-
-            final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
-
-            for (int p : d.partitions())
-                remaining.put(p, false);
-
-            if (isCancelled() || topologyChanged())
-                return missed;
-
-            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
-
-            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
-
-            int cnt = 0;
-
-            while (cnt < threadCnt) {
-                sParts.add(new HashSet<Integer>());
-
-                final int idx = cnt;
-
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
-                        handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
-                            exchFut, missed, d, remaining);
-                    }
-                });
-
-                cnt++;
-            }
-
-            Iterator<Integer> it = d.partitions().iterator();
-
-            cnt = 0;
-
-            while (it.hasNext()) {
-                sParts.get(cnt % threadCnt).add(it.next());
-
-                cnt++;
-            }
-
-            try {
-                cnt = 0;
-
-                while (cnt < threadCnt) {
-
-                    // Create copy.
-                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
-
-                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
-
-                    try {
-                        if (logg && cctx.name().equals("cache"))
-                        System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
-
-                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to send partition demand message to local node", e);
-                    }
-
-                    cnt++;
-                }
-
-                do {
-                    U.sleep(1000);//Todo: improve
-                }
-                while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
-
-                return missed;
-            }
-            finally {
-                cnt = 0;
-
-                while (cnt < threadCnt) {
-                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
-
-                    cnt++;
-                }
-            }
-        }
-
-        boolean logg = false;
-
-        /**
-         * @param s Supply message.
-         * @param node Node.
-         * @param topVer Topology version.
-         * @param top Topology.
-         * @param exchFut Exchange future.
-         * @param missed Missed.
-         * @param d initial DemandMessage.
-         */
-        private void handleSupplyMessage(
-            int idx,
-            SupplyMessage s,
-            ClusterNode node,
-            AffinityTopologyVersion topVer,
-            GridDhtPartitionTopology top,
-            GridDhtPartitionsExchangeFuture exchFut,
-            Set<Integer> missed,
-            GridDhtPartitionDemandMessage d,
-            ConcurrentHashMap8 remaining) {
-
-            if (logg && cctx.name().equals("cache"))
-            System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
-
-            // Check that message was received from expected node.
-            if (!s.senderId().equals(node.id())) {
-                U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                    ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
-                return;
-            }
-
-            if (topologyChanged())
-                return;
-
-            if (log.isDebugEnabled())
-                log.debug("Received supply message: " + s);
-
-            GridDhtPartitionSupplyMessage supply = s.supply();
-
-            // Check whether there were class loading errors on unmarshal
-            if (supply.classError() != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Class got undeployed during preloading: " + supply.classError());
-
-                return;
-            }
-
-            // Preload.
-            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                int p = e.getKey();
-
-                if (cctx.affinity().localNode(p, topVer)) {
-                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                    assert part != null;
-
-                    if (part.state() == MOVING) {
-                        boolean reserved = part.reserve();
-
-                        assert reserved : "Failed to reserve partition [gridName=" +
-                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
-
-                        part.lock();
-
-                        try {
-                            // Loop through all received entries and try to preload them.
-                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
-                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Preloading is not permitted for entry due to " +
-                                            "evictions [key=" + entry.key() +
-                                            ", ver=" + entry.version() + ']');
-
-                                    continue;
-                                }
-                                try {
-                                    if (!preloadEntry(node, p, entry, topVer)) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Got entries for invalid partition during " +
-                                                "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-
-                                        break;
-                                    }
-                                }
-                                catch (IgniteCheckedException ex) {
-                                    cancel();
-
-                                    return;
-                                }
-                            }
-
-                            boolean last = supply.last().contains(p);
-
-                            // If message was last for this partition,
-                            // then we take ownership.
-                            if (last) {
-                                top.own(part);//todo: close future?
-
-//                                if (logg && cctx.name().equals("cache"))
-//                                    System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
-
-                                remaining.remove(p);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Finished rebalancing partition: " + part);
-
-                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                        exchFut.discoveryEvent());
-                            }
-                        }
-                        finally {
-                            part.unlock();
-                            part.release();
-                        }
-                    }
-                    else {
-                        remaining.remove(p);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
-                    }
-                }
-                else {
-                    remaining.remove(p);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
-                }
-            }
-
-            for (Integer miss : s.supply().missed())
-                remaining.remove(miss);
-
-            // Only request partitions based on latest topology version.
-            for (Integer miss : s.supply().missed())
-                if (cctx.affinity().localNode(miss, topVer))
-                    missed.add(miss);
-
-            if (!remaining.isEmpty()) {
-                try {
-                    // Create copy.
-                    GridDhtPartitionDemandMessage nextD =
-                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
-
-                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
-
-                    // Send demand message.
-                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(idx, cctx.cacheId()),
-                        nextD, cctx.ioPolicy(), d.timeout());
-
-                    if (logg && cctx.name().equals("cache"))
-                        System.out.println("D " + idx + " ack  " + cctx.localNode().id());
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
-
-                    cancel();
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            try {
-                int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                if (!CU.isMarshallerCache(cctx.name())) {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
-
-                    try {
-                        cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                if (rebalanceOrder > 0) {
-                    IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
-                    try {
-                        if (fut != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
-                                    ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                            fut.get();
-                        }
-                    }
-                    catch (IgniteInterruptedCheckedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
-                                "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
-
-                        return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
-                    }
-                }
-
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                boolean stopEvtFired = false;
-
-                while (!isCancelled()) {
-                    try {
-                        barrier.await();
-
-                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
-                            cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
-
-                            if (!cctx.isReplicated() || !stopEvtFired) {
-                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                                stopEvtFired = true;
-                            }
-                        }
-                    }
-                    catch (BrokenBarrierException ignore) {
-                        throw new InterruptedException("Demand worker stopped.");
-                    }
-
-                    // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments assigns = null;
-
-                    while (assigns == null)
-                        assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
-
-                    demandLock.readLock().lock();
-
-                    try {
-                        exchFut = assigns.exchangeFuture();
-
-                        // Assignments are empty if preloading is disabled.
-                        if (assigns.isEmpty())
-                            continue;
-
-                        boolean resync = false;
-
-                        // While.
-                        // =====
-                        while (!isCancelled() && !topologyChanged() && !resync) {
-                            Collection<Integer> missed = new HashSet<>();
-
-                            // For.
-                            // ===
-                            for (ClusterNode node : assigns.keySet()) {
-                                if (topologyChanged() || isCancelled())
-                                    break; // For.
-
-                                GridDhtPartitionDemandMessage d = assigns.remove(node);
-
-                                // If another thread is already processing this message,
-                                // move to the next node.
-                                if (d == null)
-                                    continue; // For.
-
-                                try {
-                                    Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
-
-                                    if (!set.isEmpty()) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
-                                                set + ']');
-
-                                        missed.addAll(set);
-                                    }
-                                }
-                                catch (IgniteInterruptedCheckedException e) {
-                                    throw e;
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
-                                            ", msg=" + e.getMessage() + ']');
-
-                                    resync = true;
-
-                                    break; // For.
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
-                                }
-                            }
-
-                            // Processed missed entries.
-                            if (!missed.isEmpty()) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reassigning partitions that were missed: " + missed);
-
-                                assert exchFut.exchangeId() != null;
-
-                                cctx.shared().exchange().forceDummyExchange(true, exchFut);
-                            }
-                            else
-                                break; // While.
-                        }
-                    }
-                    finally {
-                        demandLock.readLock().unlock();
-
-                        syncFut.onWorkerDone(this);
-                    }
-
-                    cctx.shared().exchange().scheduleResendPartitions();
-                }
-            }
-            finally {
-                // Safety.
-                syncFut.onWorkerDone(this);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
-        }
-    }
-
-    /**
-     * Sets last exchange future.
-     *
-     * @param lastFut Last future to set.
-     */
-    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        lastExchangeFut = lastFut;
-    }
-
-    /**
-     * @param exchFut Exchange future.
-     * @return Assignments of partitions to nodes.
-     */
-    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        // No assignments for disabled preloader.
-        GridDhtPartitionTopology top = cctx.dht().topology();
-
-        if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        int partCnt = cctx.affinity().partitions();
-
-        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
-            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
-
-        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
-
-        AffinityTopologyVersion topVer = assigns.topologyVersion();
-
-        for (int p = 0; p < partCnt; p++) {
-            if (cctx.shared().exchange().hasPendingExchange()) {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
-                        exchFut.exchangeId());
-
-                break;
-            }
-
-            // If partition belongs to local node.
-            if (cctx.affinity().localNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                assert part != null;
-                assert part.id() == p;
-
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
-
-                    continue; // For.
-                }
-
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
-
-                if (picked.isEmpty()) {
-                    top.own(part);
-
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.first(picked);
-
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
-
-                    if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
-                            top.updateSequence(),
-                            exchFut.exchangeId().topologyVersion(),
-                            cctx.cacheId()));
-                    }
-
-                    msg.addPartition(p);
-                }
-            }
-        }
-
-        return assigns;
-    }
-
-    /**
-     *
-     */
-    private class SyncFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Remaining workers. */
-        private Collection<DemandWorker> remaining;
-
-        /**
-         * @param workers List of workers.
-         */
-        private SyncFuture(Collection<DemandWorker> workers) {
-            assert workers.size() == poolSize();
-
-            remaining = Collections.synchronizedList(new LinkedList<>(workers));
-        }
-
-        /**
-         * @param w Worker who iterated through all partitions.
-         */
-        void onWorkerDone(DemandWorker w) {
-            if (isDone())
-                return;
-
-            if (remaining.remove(w))
-                if (log.isDebugEnabled())
-                    log.debug("Completed full partition iteration for worker [worker=" + w + ']');
-
-            if (remaining.isEmpty()) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed sync future.");
-
-                onDone();
-            }
-        }
-    }
-
-    /**
-     * Supply message wrapper.
-     */
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
-
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
new file mode 100644
index 0000000..711b69b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -0,0 +1,1127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * Thread pool for requesting partitions from other nodes
+ * and populating local cache.
+ */
+@SuppressWarnings("NonConstantFieldWithUpperCaseName")
+public class GridDhtPartitionDemander {
+    /** Dummy message to wake up a blocking queue if a node leaves. */
+    private final SupplyMessage DUMMY_TOP = new SupplyMessage();
+
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final ReadWriteLock busyLock;
+
+    /** */
+    @GridToStringInclude
+    private final Collection<DemandWorker> dmdWorkers;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
+    @GridToStringInclude
+    private SyncFuture syncFut;
+
+    /** Preload timeout. */
+    private final AtomicLong timeout;
+
+    /** Allows demand threads to synchronize their step. */
+    private CyclicBarrier barrier;
+
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
+    /** */
+    private int poolSize;
+
+    /** Last timeout object. */
+    private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
+
+    /** Last exchange future. */
+    private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+
+    /**
+     * @param cctx Cache context.
+     * @param busyLock Shutdown lock.
+     */
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+        assert cctx != null;
+        assert busyLock != null;
+
+        this.cctx = cctx;
+        this.busyLock = busyLock;
+
+        log = cctx.logger(getClass());
+
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+
+        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+        if (enabled) {
+            barrier = new CyclicBarrier(poolSize);
+
+            dmdWorkers = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                dmdWorkers.add(new DemandWorker(i));
+
+            syncFut = new SyncFuture(dmdWorkers);
+        }
+        else {
+            dmdWorkers = Collections.emptyList();
+
+            syncFut = new SyncFuture(dmdWorkers);
+
+            // Calling onDone() immediately since preloading is disabled.
+            syncFut.onDone();
+        }
+
+        timeout = new AtomicLong(cctx.config().getRebalanceTimeout());
+    }
+
+    /**
+     *
+     */
+    void start() {
+        if (poolSize > 0) {
+            for (DemandWorker w : dmdWorkers)
+                new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start();
+        }
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        U.cancel(dmdWorkers);
+
+        if (log.isDebugEnabled())
+            log.debug("Before joining on demand workers: " + dmdWorkers);
+
+        U.join(dmdWorkers, log);
+
+        if (log.isDebugEnabled())
+            log.debug("After joining on demand workers: " + dmdWorkers);
+
+        lastExchangeFut = null;
+
+        lastTimeoutObj.set(null);
+    }
+
+    /**
+     * @return Future for {@link CacheRebalanceMode#SYNC} mode.
+     */
+    IgniteInternalFuture<?> syncFuture() {
+        return syncFut;
+    }
+
+    /**
+     * Sets preload predicate for demand pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    /**
+     * @return Size of this thread pool.
+     */
+    int poolSize() {
+        return poolSize;
+    }
+
+    /**
+     * Force preload.
+     */
+    void forcePreload() {
+        GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
+
+        if (obj != null)
+            cctx.time().removeTimeoutObject(obj);
+
+        final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+        if (exchFut != null) {
+            if (log.isDebugEnabled())
+                log.debug("Forcing rebalance event for future: " + exchFut);
+
+            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    cctx.shared().exchange().forcePreloadExchange(exchFut);
+                }
+            });
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Ignoring force rebalance request (no topology event happened yet).");
+    }
+
+    /**
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        if (busyLock.readLock().tryLock())
+            return true;
+
+        if (log.isDebugEnabled())
+            log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
+
+        return false;
+    }
+
+    /**
+     * @param idx
+     * @return topic
+     */
+    static Object topic(int idx, int cacheId, UUID nodeId) {
+        return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId
+    }
+
+    /**
+     *
+     */
+    private void leaveBusy() {
+        busyLock.readLock().unlock();
+    }
+
+    /**
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int type, DiscoveryEvent discoEvt) {
+        preloadEvent(-1, type, discoEvt);
+    }
+
+    /**
+     * @param part Partition.
+     * @param type Type.
+     * @param discoEvt Discovery event.
+     */
+    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        assert discoEvt != null;
+
+        cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+    }
+
+    /**
+     * @param msg Message to check.
+     * @return {@code True} if dummy message.
+     */
+    private boolean dummyTopology(SupplyMessage msg) {
+        return msg == DUMMY_TOP;
+    }
+
+    /**
+     * @param deque Deque to poll from.
+     * @param time Time to wait.
+     * @param w Worker.
+     * @return Polled item.
+     * @throws InterruptedException If interrupted.
+     */
+    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
+        assert w != null;
+
+        // There is currently a case where {@code interrupted}
+        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
+        // will always make sure that interrupted flag gets reset before going into wait conditions.
+        // The true fix should actually make sure that interrupted flag does not get reset or that
+        // interrupted exception gets propagated. Until we find a real fix, this method should
+        // always work to make sure that there is no hanging during stop.
+        if (w.isCancelled())
+            Thread.currentThread().interrupt();
+
+        return deque.poll(time, MILLISECONDS);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+    }
+
+    /**
+     * @param assigns Assignments.
+     * @param force {@code True} if dummy reassign.
+     */
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
+        if (log.isDebugEnabled())
+            log.debug("Adding partition assignments: " + assigns);
+
+        long delay = cctx.config().getRebalanceDelay();
+
+        if (delay == 0 || force) {
+            assert assigns != null;
+
+            synchronized (dmdWorkers) {
+                for (DemandWorker w : dmdWorkers)
+                    w.addAssignments(assigns);
+            }
+        }
+        else if (delay > 0) {
+            assert !force;
+
+            GridTimeoutObject obj = lastTimeoutObj.get();
+
+            if (obj != null)
+                cctx.time().removeTimeoutObject(obj);
+
+            final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
+
+            assert exchFut != null : "Delaying rebalance process without topology event.";
+
+            obj = new GridTimeoutObjectAdapter(delay) {
+                @Override public void onTimeout() {
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
+                            cctx.shared().exchange().forcePreloadExchange(exchFut);
+                        }
+                    });
+                }
+            };
+
+            lastTimeoutObj.set(obj);
+
+            cctx.time().addTimeoutObject(obj);
+        }
+    }
+
+    /**
+     *
+     */
+    void unwindUndeploys() {
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionDemander.class, this);
+    }
+
+    /**
+     *
+     */
+    private class DemandWorker extends GridWorker {
+        /** Worker ID. */
+        private int id;
+
+        /** Partition-to-node assignments. */
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+        /** Hide worker logger and use cache logger instead. */
+        private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+        /**
+         * @param id Worker ID.
+         */
+        private DemandWorker(int id) {
+            super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemander.this.log);
+
+            assert id >= 0;
+
+            this.id = id;
+        }
+
+        /**
+         * @param assigns Assignments.
+         */
+        void addAssignments(GridDhtPreloaderAssignments assigns) {
+            assert assigns != null;
+
+            assignQ.offer(assigns);
+
+            if (log.isDebugEnabled())
+                log.debug("Added assignments to worker: " + this);
+        }
+
+        /**
+         * @return {@code True} if topology changed.
+         */
+        private boolean topologyChanged() {
+            return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged();
+        }
+
+        /**
+         * @param pick Node picked for preloading.
+         * @param p Partition.
+         * @param entry Preloaded entry.
+         * @param topVer Topology version.
+         * @return {@code False} if partition has become invalid during preloading.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private boolean preloadEntry(
+            ClusterNode pick,
+            int p,
+            GridCacheEntryInfo entry,
+            AffinityTopologyVersion topVer
+        ) throws IgniteCheckedException {
+            try {
+                GridCacheEntryEx cached = null;
+
+                try {
+                    cached = cctx.dht().entryEx(entry.key());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
+
+                    if (cctx.dht().isIgfsDataCache() &&
+                        cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
+                        LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
+                            "value, will ignore rebalance entries): " + name());
+
+                        if (cached.markObsoleteIfEmpty(null))
+                            cached.context().cache().removeIfObsolete(cached.key());
+
+                        return true;
+                    }
+
+                    if (preloadPred == null || preloadPred.apply(entry)) {
+                        if (cached.initialValue(
+                            entry.value(),
+                            entry.version(),
+                            entry.ttl(),
+                            entry.expireTime(),
+                            true,
+                            topVer,
+                            cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                        )) {
+                            cctx.evicts().touch(cached, topVer); // Start tracking.
+
+                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
+                                cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
+                                    (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
+                                    false, null, null, null);
+                        }
+                        else if (log.isDebugEnabled())
+                            log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
+                                ", part=" + p + ']');
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
+                            cached.key() + ", part=" + p + ']');
+                }
+                catch (GridDhtInvalidPartitionException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Partition became invalid during rebalancing (will ignore): " + p);
+
+                    return false;
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
+                    cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+            }
+
+            return true;
+        }
+
+        /**
+         * @param node Node to demand from.
+         * @param topVer Topology version.
+         * @param d Demand message.
+         * @param exchFut Exchange future.
+         * @return Missed partitions.
+         * @throws InterruptedException If interrupted.
+         * @throws ClusterTopologyCheckedException If node left.
+         * @throws IgniteCheckedException If failed to send message.
+         */
+        private Set<Integer> demandFromNode(
+            final ClusterNode node,
+            final AffinityTopologyVersion topVer,
+            final GridDhtPartitionDemandMessage d,
+            final GridDhtPartitionsExchangeFuture exchFut
+        ) throws InterruptedException, IgniteCheckedException {
+            final GridDhtPartitionTopology top = cctx.dht().topology();
+
+            long timeout = GridDhtPartitionDemander.this.timeout.get();
+
+            d.timeout(timeout);
+            d.workerId(id);
+
+            final Set<Integer> missed = new HashSet<>();
+
+            final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
+
+            for (int p : d.partitions())
+                remaining.put(p, false);
+
+            if (isCancelled() || topologyChanged())
+                return missed;
+
+            int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count
+
+            List<Set<Integer>> sParts = new ArrayList<>(threadCnt);
+
+            int cnt = 0;
+
+            while (cnt < threadCnt) {
+                sParts.add(new HashSet<Integer>());
+
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) {
+                        handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top,
+                            exchFut, missed, d, remaining);
+                    }
+                });
+
+                cnt++;
+            }
+
+            Iterator<Integer> it = d.partitions().iterator();
+
+            cnt = 0;
+
+            while (it.hasNext()) {
+                sParts.get(cnt % threadCnt).add(it.next());
+
+                cnt++;
+            }
+
+            try {
+                cnt = 0;
+
+                while (cnt < threadCnt) {
+
+                    // Create copy.
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                    initD.topic(topic(cnt, cctx.cacheId(),node.id()));
+
+                    try {
+                        if (logg && cctx.name().equals("cache"))
+                        System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id());
+
+                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send partition demand message to local node", e);
+                    }
+
+                    cnt++;
+                }
+
+                do {
+                    U.sleep(1000);//Todo: improve
+                }
+                while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
+
+                return missed;
+            }
+            finally {
+                cnt = 0;
+
+                while (cnt < threadCnt) {
+                    cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id()));
+
+                    cnt++;
+                }
+            }
+        }
+
+        boolean logg = false;
+
+        /**
+         * @param s Supply message.
+         * @param node Node.
+         * @param topVer Topology version.
+         * @param top Topology.
+         * @param exchFut Exchange future.
+         * @param missed Missed.
+         * @param d initial DemandMessage.
+         */
+        private void handleSupplyMessage(
+            int idx,
+            SupplyMessage s,
+            ClusterNode node,
+            AffinityTopologyVersion topVer,
+            GridDhtPartitionTopology top,
+            GridDhtPartitionsExchangeFuture exchFut,
+            Set<Integer> missed,
+            GridDhtPartitionDemandMessage d,
+            ConcurrentHashMap8 remaining) {
+
+            if (logg && cctx.name().equals("cache"))
+            System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id());
+
+            // Check that message was received from expected node.
+            if (!s.senderId().equals(node.id())) {
+                U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                    ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+                return;
+            }
+
+            if (topologyChanged())
+                return;
+
+            if (log.isDebugEnabled())
+                log.debug("Received supply message: " + s);
+
+            GridDhtPartitionSupplyMessage supply = s.supply();
+
+            // Check whether there were class loading errors on unmarshal
+            if (supply.classError() != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Class got undeployed during preloading: " + supply.classError());
+
+                return;
+            }
+
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+                int p = e.getKey();
+
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                    assert part != null;
+
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
+
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+                        part.lock();
+
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
+
+                                    continue;
+                                }
+                                try {
+                                    if (!preloadEntry(node, p, entry, topVer)) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Got entries for invalid partition during " +
+                                                "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+
+                                        break;
+                                    }
+                                }
+                                catch (IgniteCheckedException ex) {
+                                    cancel();
+
+                                    return;
+                                }
+                            }
+
+                            boolean last = supply.last().contains(p);
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);//todo: close future?
+
+//                                if (logg && cctx.name().equals("cache"))
+//                                    System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id());
+
+                                remaining.remove(p);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
+
+                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                        exchFut.discoveryEvent());
+                            }
+                        }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        remaining.remove(p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                    }
+                }
+                else {
+                    remaining.remove(p);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
+            }
+
+            for (Integer miss : s.supply().missed())
+                remaining.remove(miss);
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : s.supply().missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    missed.add(miss);
+
+            if (!remaining.isEmpty()) {
+                try {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+
+                    nextD.topic(topic(idx, cctx.cacheId(), node.id()));
+
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+                        nextD, cctx.ioPolicy(), d.timeout());
+
+                    if (logg && cctx.name().equals("cache"))
+                        System.out.println("D " + idx + " ack  " + cctx.localNode().id());
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+
+                    cancel();
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            try {
+                int rebalanceOrder = cctx.config().getRebalanceOrder();
+
+                if (!CU.isMarshallerCache(cctx.name())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');
+
+                    try {
+                        cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
+                    }
+                    catch (IgniteInterruptedCheckedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
+                                "[cacheName=" + cctx.name() + ']');
+
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
+                    }
+                }
+
+                if (rebalanceOrder > 0) {
+                    IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
+
+                    try {
+                        if (fut != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() +
+                                    ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                            fut.get();
+                        }
+                    }
+                    catch (IgniteInterruptedCheckedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
+                                "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+
+                        return;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
+                    }
+                }
+
+                GridDhtPartitionsExchangeFuture exchFut = null;
+
+                boolean stopEvtFired = false;
+
+                while (!isCancelled()) {
+                    try {
+                        barrier.await();
+
+                        if (id == 0 && exchFut != null && !exchFut.dummy() &&
+                            cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) {
+
+                            if (!cctx.isReplicated() || !stopEvtFired) {
+                                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+
+                                stopEvtFired = true;
+                            }
+                        }
+                    }
+                    catch (BrokenBarrierException ignore) {
+                        throw new InterruptedException("Demand worker stopped.");
+                    }
+
+                    // Sync up all demand threads at this step.
+                    GridDhtPreloaderAssignments assigns = null;
+
+                    while (assigns == null)
+                        assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
+
+                    demandLock.readLock().lock();
+
+                    try {
+                        exchFut = assigns.exchangeFuture();
+
+                        // Assignments are empty if preloading is disabled.
+                        if (assigns.isEmpty())
+                            continue;
+
+                        boolean resync = false;
+
+                        // While.
+                        // =====
+                        while (!isCancelled() && !topologyChanged() && !resync) {
+                            Collection<Integer> missed = new HashSet<>();
+
+                            // For.
+                            // ===
+                            for (ClusterNode node : assigns.keySet()) {
+                                if (topologyChanged() || isCancelled())
+                                    break; // For.
+
+                                GridDhtPartitionDemandMessage d = assigns.remove(node);
+
+                                // If another thread is already processing this message,
+                                // move to the next node.
+                                if (d == null)
+                                    continue; // For.
+
+                                try {
+                                    Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut);
+
+                                    if (!set.isEmpty()) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+                                                set + ']');
+
+                                        missed.addAll(set);
+                                    }
+                                }
+                                catch (IgniteInterruptedCheckedException e) {
+                                    throw e;
+                                }
+                                catch (ClusterTopologyCheckedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+                                            ", msg=" + e.getMessage() + ']');
+
+                                    resync = true;
+
+                                    break; // For.
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+                                }
+                            }
+
+                            // Processed missed entries.
+                            if (!missed.isEmpty()) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Reassigning partitions that were missed: " + missed);
+
+                                assert exchFut.exchangeId() != null;
+
+                                cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                            }
+                            else
+                                break; // While.
+                        }
+                    }
+                    finally {
+                        demandLock.readLock().unlock();
+
+                        syncFut.onWorkerDone(this);
+                    }
+
+                    cctx.shared().exchange().scheduleResendPartitions();
+                }
+            }
+            finally {
+                // Safety.
+                syncFut.onWorkerDone(this);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
+        }
+    }
+
+    /**
+     * Sets last exchange future.
+     *
+     * @param lastFut Last future to set.
+     */
+    void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        lastExchangeFut = lastFut;
+    }
+
+    /**
+     * @param exchFut Exchange future.
+     * @return Assignments of partitions to nodes.
+     */
+    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        int partCnt = cctx.affinity().partitions();
+
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
+
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
+
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
+
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                assert part != null;
+                assert part.id() == p;
+
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
+                    }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.first(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
+            }
+        }
+
+        return assigns;
+    }
+
+    /**
+     *
+     */
+    private class SyncFuture extends GridFutureAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Remaining workers. */
+        private Collection<DemandWorker> remaining;
+
+        /**
+         * @param workers List of workers.
+         */
+        private SyncFuture(Collection<DemandWorker> workers) {
+            assert workers.size() == poolSize();
+
+            remaining = Collections.synchronizedList(new LinkedList<>(workers));
+        }
+
+        /**
+         * @param w Worker who iterated through all partitions.
+         */
+        void onWorkerDone(DemandWorker w) {
+            if (isDone())
+                return;
+
+            if (remaining.remove(w))
+                if (log.isDebugEnabled())
+                    log.debug("Completed full partition iteration for worker [worker=" + w + ']');
+
+            if (remaining.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Completed sync future.");
+
+                onDone();
+            }
+        }
+    }
+
+    /**
+     * Supply message wrapper.
+     */
+    private static class SupplyMessage {
+        /** Sender ID. */
+        private UUID sndId;
+
+        /** Supply message. */
+        private GridDhtPartitionSupplyMessage supply;
+
+        /**
+         * Dummy constructor.
+         */
+        private SupplyMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param sndId Sender ID.
+         * @param supply Supply message.
+         */
+        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+            this.sndId = sndId;
+            this.supply = supply;
+        }
+
+        /**
+         * @return Sender ID.
+         */
+        UUID senderId() {
+            return sndId;
+        }
+
+        /**
+         * @return Message.
+         */
+        GridDhtPartitionSupplyMessage supply() {
+            return supply;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SupplyMessage.class, this);
+        }
+    }
+}


[8/8] incubator-ignite git commit: ignite-1093

Posted by av...@apache.org.
ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50e188df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50e188df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50e188df

Branch: refs/heads/ignite-1093
Commit: 50e188df2d20e7fe889e26ce576b55d6192d8ab4
Parents: 6a733ef
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Tue Aug 11 19:20:14 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Tue Aug 11 19:20:14 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/preloader/GridDhtPartitionDemander.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50e188df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 30a04c0..f6a33c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -731,6 +731,8 @@ private class SyncFuture extends GridFutureAdapter<Object> {
 
             missed.clear();
 
+            cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
+
             onDone();
         }
     }