You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/05 17:31:54 UTC

incubator-ignite git commit: ignite-1093 Parallel supplyPool

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 8341c6d76 -> 4ed77e375


ignite-1093 Parallel supplyPool


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4ed77e37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4ed77e37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4ed77e37

Branch: refs/heads/ignite-1093
Commit: 4ed77e37529e34795a3b581010be3e0f35c8fe6e
Parents: 8341c6d
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Wed Aug 5 18:31:34 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Wed Aug 5 18:31:34 2015 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionDemandPool.java   |  78 +--
 .../preloader/GridDhtPartitionSupplyPool.java   | 577 ++++++++++---------
 .../GridCacheMassiveRebalancingSelfTest.java    | 115 +++-
 3 files changed, 432 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index d9d871f..0e0bc01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -424,22 +424,6 @@ public class GridDhtPartitionDemandPool {
         }
 
         /**
-         * @param timeout Timed out value.
-         */
-        private void growTimeout(long timeout) {
-            long newTimeout = (long)(timeout * 1.5D);
-
-            // Account for overflow.
-            if (newTimeout < 0)
-                newTimeout = Long.MAX_VALUE;
-
-            // Grow by 50% only if another thread didn't do it already.
-            if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout))
-                U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " +
-                    newTimeout + "ms.");
-        }
-
-        /**
          * @param pick Node picked for preloading.
          * @param p Partition.
          * @param entry Preloaded entry.
@@ -554,7 +538,7 @@ public class GridDhtPartitionDemandPool {
                 return missed;
 
             for (int p : d.partitions()) {
-                cctx.io().addOrderedHandler(topic(p, node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                cctx.io().addOrderedHandler(topic(p, topVer.topologyVersion()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
                     @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
                         handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
                             exchFut, missed, d);
@@ -565,26 +549,25 @@ public class GridDhtPartitionDemandPool {
             try {
                 Iterator<Integer> it = remaining.keySet().iterator();
 
-                final int maxC = Runtime.getRuntime().availableProcessors() / 2; //Todo: make param
+                final int maxC = cctx.config().getRebalanceThreadPoolSize();
 
                 int sent = 0;
 
                 while (sent < maxC && it.hasNext()) {
                     int p = it.next();
 
-                    Collection<Integer> ps = Collections.singleton(p);
-
                     boolean res = remaining.replace(p, false, true);
 
                     assert res;
 
                     // Create copy.
-                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, ps);
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
 
-                    initD.topic(topic(p, node.id()));
+                    initD.topic(topic(p, topVer.topologyVersion()));
 
                     // Send initial demand message.
-                    cctx.io().send(node, initD, cctx.ioPolicy());
+                    cctx.io().sendOrderedMessage(node,
+                        GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
 
                     sent++;
                 }
@@ -598,17 +581,17 @@ public class GridDhtPartitionDemandPool {
             }
             finally {
                 for (int p : d.partitions())
-                    cctx.io().removeOrderedHandler(topic(p, node.id()));
+                    cctx.io().removeOrderedHandler(topic(p, topVer.topologyVersion()));
             }
         }
 
         /**
-         * @param p Partition.
-         * @param id remote node id.
+         * @param p
+         * @param topVer
          * @return topic
          */
-        private Object topic(int p, UUID id) {
-            return TOPIC_CACHE.topic("Preloading", id, cctx.cacheId(), p);
+        private Object topic(int p, long topVer) {
+            return TOPIC_CACHE.topic("DemandPool" + topVer, cctx.cacheId(), p);//Todo topVer as long
         }
 
         /**
@@ -631,7 +614,7 @@ public class GridDhtPartitionDemandPool {
             Set<Integer> missed,
             GridDhtPartitionDemandMessage d) {
 
-            //Todo: check and remove
+            //Todo: check it still actual and remove
             // Check that message was received from expected node.
             if (!s.senderId().equals(node.id())) {
                 U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
@@ -640,6 +623,9 @@ public class GridDhtPartitionDemandPool {
                 return;
             }
 
+            if (topologyChanged())
+                return;
+
             if (log.isDebugEnabled())
                 log.debug("Received supply message: " + s);
 
@@ -715,7 +701,26 @@ public class GridDhtPartitionDemandPool {
 
                                 remaining.remove(p);
 
-                                demandNextPartition(node, remaining, d);
+                                demandNextPartition(node, remaining, d, topVer);
+                            }
+                            else {
+                                try {
+                                    // Create copy.
+                                    GridDhtPartitionDemandMessage nextD =
+                                        new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
+
+                                    nextD.topic(topic(p, topVer.topologyVersion()));
+
+                                    // Send demand message.
+                                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
+                                        nextD, cctx.ioPolicy(), d.timeout());
+                                   }
+                                catch (IgniteCheckedException ex) {
+                                    U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+
+                                    cancel();
+                                }
                             }
                         }
                         finally {
@@ -751,24 +756,25 @@ public class GridDhtPartitionDemandPool {
          * @param node Node.
          * @param remaining Remaining.
          * @param d initial DemandMessage.
+         * @param topVer Topology version.
          */
         private void demandNextPartition(
             final ClusterNode node,
             final ConcurrentHashMap8<Integer, Boolean> remaining,
-            final GridDhtPartitionDemandMessage d
+            final GridDhtPartitionDemandMessage d,
+            final AffinityTopologyVersion topVer
         ) {
             try {
                 for (Integer p : remaining.keySet()) {
                     if (remaining.replace(p, false, true)) {
-                        Collection<Integer> nextPs = Collections.singleton(p);
-
                         // Create copy.
-                        GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, nextPs);
+                        GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, Collections.singleton(p));
 
-                        nextD.topic(topic(p, node.id()));
+                        nextD.topic(topic(p, topVer.topologyVersion()));
 
                         // Send demand message.
-                        cctx.io().send(node, nextD, cctx.ioPolicy());
+                        cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplyPool.topic(p, cctx.cacheId()),
+                            nextD, cctx.ioPolicy(), d.timeout());
 
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 42d6bb2..f10837a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -19,25 +19,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.locks.*;
 
-import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 
 /**
@@ -51,23 +48,20 @@ class GridDhtPartitionSupplyPool {
     private final IgniteLogger log;
 
     /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
     private GridDhtPartitionTopology top;
 
     /** */
-    private final Collection<SupplyWorker> workers = new LinkedList<>();
-
-    /** */
-    private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>();
-
-    /** */
     private final boolean depEnabled;
 
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
+    /** Supply context map. */
+    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+    /** Done map. */
+    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor
+
     /**
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
@@ -77,43 +71,42 @@ class GridDhtPartitionSupplyPool {
         assert busyLock != null;
 
         this.cctx = cctx;
-        this.busyLock = busyLock;
 
         log = cctx.logger(getClass());
 
         top = cctx.dht().topology();
 
         if (!cctx.kernalContext().clientNode()) {
-            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
-
-            for (int i = 0; i < poolSize; i++)
-                workers.add(new SupplyWorker());
-
-            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                    processDemandMessage(id, m);
-                }
-            });
+            for (int p = 0; p <= cctx.affinity().partitions(); p++)
+                cctx.io().addOrderedHandler(topic(p, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                        processMessage(m, id);
+                    }
+                });
         }
 
         depEnabled = cctx.gridDeploy().enabled();
     }
 
     /**
+     * @param p Partition.
+     * @param id Node id.
+     * @return topic
+     */
+    static Object topic(int p, int id) {
+        return TOPIC_CACHE.topic("SupplyPool", id, p);
+    }
+
+    /**
      *
      */
     void start() {
-        for (SupplyWorker w : workers)
-            new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start();
     }
 
     /**
      *
      */
     void stop() {
-        U.cancel(workers);
-        U.join(workers, log);
-
         top = null;
     }
 
@@ -127,164 +120,85 @@ class GridDhtPartitionSupplyPool {
     }
 
     /**
-     * @return Size of this thread pool.
+     * @param d Demand message.
+     * @param id Node uuid.
      */
-    int poolSize() {
-        return cctx.config().getRebalanceThreadPoolSize();
-    }
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+        assert d != null;
+        assert id != null;
 
-    /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
 
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
 
-        return false;
-    }
+        long maxBatchesCnt = 3;//Todo: param
 
-    /**
-     * @param nodeId Sender node ID.
-     * @param d Message.
-     */
-    private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) {
-        if (!enterBusy())
-            return;
+        ClusterNode node = cctx.discovery().node(id);
+
+        boolean ack = false;
+
+        T2<UUID, Object> scId = new T2<>(id, d.topic());
 
         try {
-            if (cctx.rebalanceEnabled()) {
-                if (log.isDebugEnabled())
-                    log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']');
+            SupplyContext sctx = scMap.remove(scId);
 
-                queue.offer(new DemandMessage(nodeId, d));
-            }
-            else
-                U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
+            if (doneMap.get(scId) != null)//Todo: refactor
+                return;
 
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
+            long bCnt = 0;
 
-    /**
-     * @param deque Deque to poll from.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(2000, MILLISECONDS);
-    }
+            int phase = 0;
 
-    /**
-     * Supply work.
-     */
-    private class SupplyWorker extends GridWorker {
-        /** Hide worker logger and use cache logger. */
-        private IgniteLogger log = GridDhtPartitionSupplyPool.this.log;
+            if (sctx != null)
+                phase = sctx.phase;
 
-        /**
-         * Default constructor.
-         */
-        private SupplyWorker() {
-            super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log);
-        }
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                DemandMessage msg = poll(queue, this);
+            while (sctx != null || partIt.hasNext()) {
+                int part = sctx != null ? sctx.part : partIt.next();
 
-                if (msg == null)
-                    continue;
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 
-                ClusterNode node = cctx.discovery().node(msg.senderId());
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
 
-                if (node == null) {
                     if (log.isDebugEnabled())
-                        log.debug("Received message from non-existing node (will ignore): " + msg);
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
 
                     continue;
                 }
 
-                processMessage(msg, node);
-            }
-        }
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
 
-        /**
-         * @param msg Message.
-         * @param node Demander.
-         */
-        private void processMessage(DemandMessage msg, ClusterNode node) {
-            assert msg != null;
-            assert node != null;
-
-            GridDhtPartitionDemandMessage d = msg.message();
-
-            GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                d.updateSequence(), cctx.cacheId());
-
-            long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-            boolean ack = false;
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
 
-            try {
-                for (int part : d.partitions()) {
-                    GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                    if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                        // Reply with partition of "-1" to let sender know that
-                        // this node is no longer an owner.
-                        s.missed(part);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested partition is not owned by local node [part=" + part +
-                                ", demander=" + msg.senderId() + ']');
-
-                        continue;
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
                     }
 
-                    GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                    try {
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+                    boolean partMissing = false;
 
-                            cctx.swap().addOffHeapListener(part, swapLsnr);
-                            cctx.swap().addSwapListener(part, swapLsnr);
-                        }
+                    if (phase == 0)
+                        phase = 1;
 
-                        boolean partMissing = false;
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
 
-                        for (GridCacheEntryEx e : loc.entries()) {
+                        while (entIt.hasNext()) {
                             if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
                                 // Demander no longer needs this partition, so we send '-1' partition and move on.
                                 s.missed(part);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + msg.senderId() + ']');
+                                        ", nodeId=" + id + ']');
 
                                 partMissing = true;
 
@@ -301,10 +215,21 @@ class GridDhtPartitionSupplyPool {
                                 if (preloadThrottle > 0)
                                     U.sleep(preloadThrottle);
 
-                                s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                    cctx.cacheId());
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+                                    swapLsnr = null;
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId());
+                                }
                             }
 
+                            GridCacheEntryEx e = entIt.next();
+
                             GridCacheEntryInfo info = e.info();
 
                             if (info != null && !info.isNew()) {
@@ -319,188 +244,228 @@ class GridDhtPartitionSupplyPool {
                         if (partMissing)
                             continue;
 
-                        if (cctx.isSwapOrOffheapEnabled()) {
-                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
-                                cctx.swap().iterator(part);
+                    }
 
-                            // Iterator may be null if space does not exist.
-                            if (iter != null) {
-                                try {
-                                    boolean prepared = false;
+                    if (phase == 1)
+                        phase = 2;
 
-                                    for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
-                                        if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                            // Demander no longer needs this partition,
-                                            // so we send '-1' partition and move on.
-                                            s.missed(part);
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                            cctx.swap().iterator(part);
 
-                                            if (log.isDebugEnabled())
-                                                log.debug("Demanding node does not need requested partition " +
-                                                    "[part=" + part + ", nodeId=" + msg.senderId() + ']');
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
 
-                                            partMissing = true;
+                                while (iter.hasNext()) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
 
-                                            break; // For.
-                                        }
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
 
-                                        if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                            ack = true;
+                                        partMissing = true;
 
-                                            if (!reply(node, d, s))
-                                                return;
+                                        break; // For.
+                                    }
 
-                                            // Throttle preloading.
-                                            if (preloadThrottle > 0)
-                                                U.sleep(preloadThrottle);
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        ack = true;
 
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                                d.updateSequence(), cctx.cacheId());
-                                        }
+                                        if (!reply(node, d, s))
+                                            return;
 
-                                        GridCacheSwapEntry swapEntry = e.getValue();
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
 
-                                        GridCacheEntryInfo info = new GridCacheEntryInfo();
+                                        if (++bCnt >= maxBatchesCnt) {
+                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
 
-                                        info.keyBytes(e.getKey());
-                                        info.ttl(swapEntry.ttl());
-                                        info.expireTime(swapEntry.expireTime());
-                                        info.version(swapEntry.version());
-                                        info.value(swapEntry.value());
+                                            swapLsnr = null;
 
-                                        if (preloadPred == null || preloadPred.apply(info))
-                                            s.addEntry0(part, info, cctx);
+                                            return;
+                                        }
                                         else {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                    "cache entry): " + info);
-
-                                            continue;
+                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId());
                                         }
+                                    }
 
-                                        // Need to manually prepare cache message.
-                                        if (depEnabled && !prepared) {
-                                            ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                                swapEntry.valueClassLoaderId() != null ?
-                                                    cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                    null;
+                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
 
-                                            if (ldr == null)
-                                                continue;
+                                    GridCacheSwapEntry swapEntry = e.getValue();
 
-                                            if (ldr instanceof GridDeploymentInfo) {
-                                                s.prepare((GridDeploymentInfo)ldr);
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
 
-                                                prepared = true;
-                                            }
-                                        }
-                                    }
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
 
-                                    if (partMissing)
                                         continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
                                 }
-                                finally {
-                                    iter.close();
-                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
                             }
                         }
+                    }
 
-                        // Stop receiving promote notifications.
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
 
-                        if (swapLsnr != null) {
-                            Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
 
-                            swapLsnr = null;
+                    if (phase == 2)
+                        phase = 3;
 
-                            for (GridCacheEntryInfo info : entries) {
-                                if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                    // Demander no longer needs this partition,
-                                    // so we send '-1' partition and move on.
-                                    s.missed(part);
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Demanding node does not need requested partition " +
-                                            "[part=" + part + ", nodeId=" + msg.senderId() + ']');
+                        swapLsnr = null;
 
-                                    // No need to continue iteration over swap entries.
-                                    break;
-                                }
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
 
-                                if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                    ack = true;
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                ack = true;
 
-                                    if (!reply(node, d, s))
-                                        return;
+                                if (!reply(node, d, s))
+                                    return;
 
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(),
-                                        d.updateSequence(),
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
                                         cctx.cacheId());
                                 }
-
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
                             }
-                        }
 
-                        // Mark as last supply message.
-                        s.last(part);
+                            GridCacheEntryInfo info = lsnrIt.next();
 
-//                        if (ack) {
-//                            s.markAck();
-//
-//                            break; // Partition for loop.
-//                        }
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
                     }
-                    finally {
-                        loc.release();
 
-                        if (swapLsnr != null) {
-                            cctx.swap().removeOffHeapListener(part, swapLsnr);
-                            cctx.swap().removeSwapListener(part, swapLsnr);
-                        }
+                    // Mark as last supply message.
+                    s.last(part);
+
+                    if (ack) {
+                        s.markAck();
+
+                        break; // Partition for loop.
                     }
                 }
+                finally {
+                    loc.release();
 
-                reply(node, d, s);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
             }
+
+            reply(node, d, s);
+
+            doneMap.put(scId, true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
         }
+    }
 
-        /**
-         * @param n Node.
-         * @param d Demand message.
-         * @param s Supply message.
-         * @return {@code True} if message was sent, {@code false} if recipient left grid.
-         * @throws IgniteCheckedException If failed.
-         */
-        private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-            throws IgniteCheckedException {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+    /**
+     * @param n Node.
+     * @param d Demand message.
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-                cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
 
-                return true;
-            }
-            catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition supply message because node left grid: " + n.id());
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
 
-                return false;
-            }
+            return false;
         }
     }
 
+
     /**
      * Demand message wrapper.
      */
@@ -542,4 +507,42 @@ class GridDhtPartitionSupplyPool {
             return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
         }
     }
+
+
+    /**
+     * @param t T.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2 t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+    }
+
+    private static class SupplyContext{
+        private int phase;
+
+        private Iterator<Integer> partIt;
+
+        private Iterator<?> entryIt;
+
+        private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        int part;
+
+        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ed77e37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index e90b7af..11ea8f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import java.util.concurrent.atomic.*;
+
 /**
  *
  */
@@ -38,6 +39,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";
 
+    /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return Long.MAX_VALUE;
     }
@@ -51,10 +53,14 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
+        if (getTestGridName(3).equals(gridName))
+            iCfg.setClientMode(true);
+
         cacheCfg.setName(CACHE_NAME_DHT);
         cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        //cacheCfg.setRebalanceBatchSize(1024);
+        cacheCfg.setRebalanceBatchSize(100 * 1024);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setRebalanceThreadPoolSize(4);
         //cacheCfg.setRebalanceTimeout(1000000);
         cacheCfg.setBackups(1);
 
@@ -63,11 +69,9 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
     }
 
     /**
-     * @throws Exception
+     * @param ignite Ignite.
      */
-    public void testMassiveRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
-
+    private void generateData(Ignite ignite) {
         try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
             for (int i = 0; i < TEST_SIZE; i++) {
                 if (i % 1_000_000 == 0)
@@ -76,12 +80,34 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
                 stmr.addData(i, i);
             }
         }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException
+     */
+    private void checkData(Ignite ignite) throws IgniteCheckedException {
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMassiveRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
 
         log.info("Preloading started.");
 
         long start = System.currentTimeMillis();
 
-        // startGrid(1);
+        //startGrid(1);
 
         startGrid(2);
 
@@ -89,19 +115,78 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
 
         stopGrid(0);
 
-       // Thread.sleep(10000);
+        //Thread.sleep(20000);
 
-       // stopGrid(1);
+        //stopGrid(1);
 
-        for (int i = 0; i < TEST_SIZE; i++) {
-            if (i % 1_000_000 == 0)
-                log.info("Checked " + i / 1_000_000 + "m entries.");
-
-            assert grid(2).cachex(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
-        }
+        checkData(grid(2));
 
         log.info("Spend " + spend + " seconds to preload entries.");
 
         stopAllGrids();
     }
+
+    /**
+     * @throws Exception
+     */
+    public void testOpPerSecRebalancingTest() throws Exception {
+        startGrid(0);
+
+        final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+        generateData(grid(0));
+
+        startGrid(1);
+        startGrid(2);
+        startGrid(3);
+
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+
+                long spend = 0;
+
+                long ops = 0;
+
+                while (!cancelled.get()) {
+                    try {
+                        long start = System.currentTimeMillis();
+
+                        int size = 1000;
+
+                        for (int i = 0; i < size; i++)
+                            grid(3).cachex(CACHE_NAME_DHT).remove(i);
+
+                        for (int i = 0; i < size; i++)
+                            grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+
+                        spend += System.currentTimeMillis() - start;
+
+                        ops += size * 2;
+                    }
+                    catch (IgniteCheckedException e) {
+                        e.printStackTrace();
+                    }
+
+                    log.info("Ops. per ms: " + ops / spend);
+                }
+            }
+        });
+        t.start();
+
+        stopGrid(0);
+        startGrid(0);
+
+        stopGrid(0);
+        startGrid(0);
+
+        stopGrid(0);
+        startGrid(0);
+
+        cancelled.set(true);
+        t.join();
+
+        checkData(grid(3));
+
+        //stopAllGrids();
+    }
 }
\ No newline at end of file