You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/07/31 17:19:15 UTC

incubator-ignite git commit: ignite-1093 Parallel demandPool

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 [created] 5e3d8008c


ignite-1093 Parallel demandPool


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e3d8008
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e3d8008
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e3d8008

Branch: refs/heads/ignite-1093
Commit: 5e3d8008c3e6d92be0374aadb47312d6c2a9f49a
Parents: 5288b2d
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Fri Jul 31 18:17:25 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Fri Jul 31 18:17:25 2015 +0300

----------------------------------------------------------------------
 .../preloader/GridDhtPartitionDemandPool.java   | 410 +++++++++----------
 .../preloader/GridDhtPartitionSupplyPool.java   |  10 +-
 .../dht/preloader/GridDhtPreloader.java         |   5 -
 .../GridCacheMassiveRebalancingSelfTest.java    | 107 +++++
 4 files changed, 302 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index a6e6c4d..50c5e90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -187,16 +187,6 @@ public class GridDhtPartitionDemandPool {
     }
 
     /**
-     * Wakes up demand workers when new exchange future was added.
-     */
-    void onExchangeFutureAdded() {
-        synchronized (dmdWorkers) {
-            for (DemandWorker w : dmdWorkers)
-                w.addMessage(DUMMY_TOP);
-        }
-    }
-
-    /**
      * Force preload.
      */
     void forcePreload() {
@@ -339,11 +329,8 @@ public class GridDhtPartitionDemandPool {
             assert assigns != null;
 
             synchronized (dmdWorkers) {
-                for (DemandWorker w : dmdWorkers) {
+                for (DemandWorker w : dmdWorkers)
                     w.addAssignments(assigns);
-
-                    w.addMessage(DUMMY_TOP);
-                }
             }
         }
         else if (delay > 0) {
@@ -403,13 +390,6 @@ public class GridDhtPartitionDemandPool {
         /** Partition-to-node assignments. */
         private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
 
-        /** Message queue. */
-        private final LinkedBlockingDeque<SupplyMessage> msgQ =
-            new LinkedBlockingDeque<>();
-
-        /** Counter. */
-        private long cntr;
-
         /** Hide worker logger and use cache logger instead. */
         private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
 
@@ -444,23 +424,6 @@ public class GridDhtPartitionDemandPool {
         }
 
         /**
-         * @param msg Message.
-         */
-        private void addMessage(SupplyMessage msg) {
-            if (!enterBusy())
-                return;
-
-            try {
-                assert dummyTopology(msg) || msg.supply().workerId() == id;
-
-                msgQ.offer(msg);
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-
-        /**
          * @param timeout Timed out value.
          */
         private void growTimeout(long timeout) {
@@ -558,14 +521,6 @@ public class GridDhtPartitionDemandPool {
         }
 
         /**
-         * @param idx Unique index for this topic.
-         * @return Topic for partition.
-         */
-        public Object topic(long idx) {
-            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
-        }
-
-        /**
          * @param node Node to demand from.
          * @param topVer Topology version.
          * @param d Demand message.
@@ -576,243 +531,258 @@ public class GridDhtPartitionDemandPool {
          * @throws IgniteCheckedException If failed to send message.
          */
         private Set<Integer> demandFromNode(
-            ClusterNode node,
+            final ClusterNode node,
             final AffinityTopologyVersion topVer,
-            GridDhtPartitionDemandMessage d,
-            GridDhtPartitionsExchangeFuture exchFut
+            final GridDhtPartitionDemandMessage d,
+            final GridDhtPartitionsExchangeFuture exchFut
         ) throws InterruptedException, IgniteCheckedException {
-            GridDhtPartitionTopology top = cctx.dht().topology();
+            final GridDhtPartitionTopology top = cctx.dht().topology();
 
-            cntr++;
+            long timeout = GridDhtPartitionDemandPool.this.timeout.get();
 
-            d.topic(topic(cntr));
+            d.timeout(timeout);
             d.workerId(id);
 
-            Set<Integer> missed = new HashSet<>();
+            final Set<Integer> missed = new HashSet<>();
 
-            // Get the same collection that will be sent in the message.
-            Collection<Integer> remaining = d.partitions();
+            final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
 
-            // Drain queue before processing a new node.
-            drainQueue();
+            for (int p : d.partitions())
+                remaining.put(p, false);
 
             if (isCancelled() || topologyChanged())
                 return missed;
 
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
-                    addMessage(new SupplyMessage(nodeId, msg));
-                }
-            });
+            for (int p : d.partitions()) {
+                cctx.io().addOrderedHandler(topic(p, node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+                    @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+                        handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
+                            exchFut, missed, d);
+                    }
+                });
+            }
 
             try {
-                boolean retry;
-
-                // DoWhile.
-                // =======
-                do {
-                    retry = false;
-
-                    // Create copy.
-                    d = new GridDhtPartitionDemandMessage(d, remaining);
-
-                    long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
-                    d.timeout(timeout);
+                Iterator<Integer> it = remaining.keySet().iterator();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
-                    // Send demand message.
-                    cctx.io().send(node, d, cctx.ioPolicy());
-
-                    // While.
-                    // =====
-                    while (!isCancelled() && !topologyChanged()) {
-                        SupplyMessage s = poll(msgQ, timeout, this);
-
-                        // If timed out.
-                        if (s == null) {
-                            if (msgQ.isEmpty()) { // Safety check.
-                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
-                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
-                                    " configuration properties).");
+                final int maxC = Runtime.getRuntime().availableProcessors() / 2; //Todo: make param
 
-                                growTimeout(timeout);
+                int sent = 0;
 
-                                // Ordered listener was removed if timeout expired.
-                                cctx.io().removeOrderedHandler(d.topic());
+                while (sent < maxC && it.hasNext()) {
+                    int p = it.next();
 
-                                // Must create copy to be able to work with IO manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage(d, remaining);
+                    Collection<Integer> ps = new ArrayList<>(1);
 
-                                // Create new topic.
-                                d.topic(topic(++cntr));
+                    boolean res = remaining.replace(p, false, true);
 
-                                // Create new ordered listener.
-                                cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                                        @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage msg) {
-                                            addMessage(new SupplyMessage(nodeId, msg));
-                                        }
-                                    });
-
-                                // Resend message with larger timeout.
-                                retry = true;
-
-                                break; // While.
-                            }
-                            else
-                                continue; // While.
-                        }
+                    assert res;
 
-                        // If topology changed.
-                        if (dummyTopology(s)) {
-                            if (topologyChanged())
-                                break; // While.
-                            else
-                                continue; // While.
-                        }
+                    ps.add(p);
 
-                        // Check that message was received from expected node.
-                        if (!s.senderId().equals(node.id())) {
-                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+                    // Create copy.
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, ps);
 
-                            continue; // While.
-                        }
+                    initD.topic(topic(p, node.id()));
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received supply message: " + s);
+                    // Send initial demand message.
+                    cctx.io().send(node, initD, cctx.ioPolicy());
 
-                        GridDhtPartitionSupplyMessage supply = s.supply();
+                    sent++;
+                }
 
-                        // Check whether there were class loading errors on unmarshal
-                        if (supply.classError() != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Class got undeployed during preloading: " + supply.classError());
+                do {
+                    U.sleep(1000);//Todo: improve
+                }
+                while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
 
-                            retry = true;
+                return missed;
+            }
+            finally {
+                for (int p : d.partitions())
+                    cctx.io().removeOrderedHandler(topic(p, node.id()));
+            }
+        }
 
-                            // Quit preloading.
-                            break;
-                        }
+        /**
+         * @param p Partition.
+         * @param id remote node id.
+         * @return topic
+         */
+        private Object topic(int p, UUID id) {
+            return TOPIC_CACHE.topic("Preloading", id, cctx.cacheId(), p);
+        }
 
-                        // Preload.
-                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                            int p = e.getKey();
+        /**
+         * @param s Supply message.
+         * @param node Node.
+         * @param topVer Topology version.
+         * @param top Topology.
+         * @param remaining Remaining.
+         * @param exchFut Exchange future.
+         * @param missed Missed.
+         * @param d initial DemandMessage.
+         */
+        private void handleSupplyMessage(
+            SupplyMessage s,
+            ClusterNode node,
+            AffinityTopologyVersion topVer,
+            GridDhtPartitionTopology top,
+            ConcurrentHashMap8<Integer, Boolean> remaining,
+            GridDhtPartitionsExchangeFuture exchFut,
+            Set<Integer> missed,
+            GridDhtPartitionDemandMessage d) {
+
+            //Todo: check and remove
+            // Check that message was received from expected node.
+            if (!s.senderId().equals(node.id())) {
+                U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+                    ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
 
-                            if (cctx.affinity().localNode(p, topVer)) {
-                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+                return;
+            }
 
-                                assert part != null;
+            if (log.isDebugEnabled())
+                log.debug("Received supply message: " + s);
 
-                                if (part.state() == MOVING) {
-                                    boolean reserved = part.reserve();
+            GridDhtPartitionSupplyMessage supply = s.supply();
 
-                                    assert reserved : "Failed to reserve partition [gridName=" +
-                                        cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+            // Check whether there were class loading errors on unmarshal
+            if (supply.classError() != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Class got undeployed during preloading: " + supply.classError());
 
-                                    part.lock();
+                return;
+            }
 
-                                    try {
-                                        Collection<Integer> invalidParts = new GridLeanSet<>();
+            assert supply.infos().entrySet().size() == 1;//Todo: remove after supply message refactoring
 
-                                        // Loop through all received entries and try to preload them.
-                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
-                                            if (!invalidParts.contains(p)) {
-                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Preloading is not permitted for entry due to " +
-                                                            "evictions [key=" + entry.key() +
-                                                            ", ver=" + entry.version() + ']');
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {//todo:only one partition (supply refactoring)
+                int p = e.getKey();
 
-                                                    continue;
-                                                }
+                if (cctx.affinity().localNode(p, topVer)) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
-                                                if (!preloadEntry(node, p, entry, topVer)) {
-                                                    invalidParts.add(p);
+                    assert part != null;
 
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Got entries for invalid partition during " +
-                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-                                                }
-                                            }
-                                        }
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
 
-                                        boolean last = supply.last().contains(p);
+                        assert reserved : "Failed to reserve partition [gridName=" +
+                            cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
 
-                                        // If message was last for this partition,
-                                        // then we take ownership.
-                                        if (last) {
-                                            remaining.remove(p);
+                        part.lock();
 
-                                            top.own(part);
+                        try {
+                            // Loop through all received entries and try to preload them.
+                            for (GridCacheEntryInfo entry : e.getValue().infos()) {
+                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Preloading is not permitted for entry due to " +
+                                            "evictions [key=" + entry.key() +
+                                            ", ver=" + entry.version() + ']');
 
-                                            if (log.isDebugEnabled())
-                                                log.debug("Finished rebalancing partition: " + part);
+                                    continue;
+                                }
+                                try {
+                                    if (!preloadEntry(node, p, entry, topVer)) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Got entries for invalid partition during " +
+                                                "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
 
-                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                                    exchFut.discoveryEvent());
-                                        }
-                                    }
-                                    finally {
-                                        part.unlock();
-                                        part.release();
+                                        break;
                                     }
                                 }
-                                else {
-                                    remaining.remove(p);
+                                catch (IgniteCheckedException ex) {
+                                    cancel();
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+                                    return;
                                 }
                             }
-                            else {
-                                remaining.remove(p);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
-                            }
-                        }
+                            boolean last = supply.last().contains(p);//Todo: refactor as boolean "last"
 
-                        remaining.removeAll(s.supply().missed());
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                top.own(part);
 
-                        // Only request partitions based on latest topology version.
-                        for (Integer miss : s.supply().missed())
-                            if (cctx.affinity().localNode(miss, topVer))
-                                missed.add(miss);
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: " + part);
 
-                        if (remaining.isEmpty())
-                            break; // While.
+                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                                        exchFut.discoveryEvent());
 
-                        if (s.supply().ack()) {
-                            retry = true;
+                                remaining.remove(p);
 
-                            break;
+                                demandNextPartition(node, remaining, d);
+                            }
                         }
+                        finally {
+                            part.unlock();
+                            part.release();
+                        }
+                    }
+                    else {
+                        remaining.remove(p);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                     }
                 }
-                while (retry && !isCancelled() && !topologyChanged());
+                else {
+                    remaining.remove(p);
 
-                return missed;
-            }
-            finally {
-                cctx.io().removeOrderedHandler(d.topic());
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+                }
             }
+
+            for (Integer miss : s.supply().missed()) // Todo: miss as param, not collection
+                remaining.remove(miss);
+
+            // Only request partitions based on latest topology version.
+            for (Integer miss : s.supply().missed())
+                if (cctx.affinity().localNode(miss, topVer))
+                    missed.add(miss);
         }
 
         /**
-         * @throws InterruptedException If interrupted.
+         * @param node Node.
+         * @param remaining Remaining.
+         * @param d initial DemandMessage.
          */
-        private void drainQueue() throws InterruptedException {
-            while (!msgQ.isEmpty()) {
-                SupplyMessage msg = msgQ.take();
+        private void demandNextPartition(
+            final ClusterNode node,
+            final ConcurrentHashMap8<Integer, Boolean> remaining,
+            final GridDhtPartitionDemandMessage d
+        ) {
+            try {
+                for (Integer p : remaining.keySet()) {
+                    if (remaining.replace(p, false, true)) {
+                        Collection<Integer> nextPs = new ArrayList<>(1);
 
-                if (log.isDebugEnabled())
-                    log.debug("Drained supply message: " + msg);
+                        nextPs.add(p);
+
+                        // Create copy.
+                        GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, nextPs);
+
+                        nextD.topic(topic(p, node.id()));
+
+                        // Send demand message.
+                        cctx.io().send(node, nextD, cctx.ioPolicy());
+
+                        break;
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+                    "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+                cancel();
             }
         }
 
@@ -980,7 +950,7 @@ public class GridDhtPartitionDemandPool {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 13cfef3..42d6bb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -452,11 +452,11 @@ class GridDhtPartitionSupplyPool {
                         // Mark as last supply message.
                         s.last(part);
 
-                        if (ack) {
-                            s.markAck();
-
-                            break; // Partition for loop.
-                        }
+//                        if (ack) {
+//                            s.markAck();
+//
+//                            break; // Partition for loop.
+//                        }
                     }
                     finally {
                         loc.release();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a43ebe2..fbcbc37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -244,11 +244,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeFutureAdded() {
-        demandPool.onExchangeFutureAdded();
-    }
-
-    /** {@inheritDoc} */
     @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
         demandPool.updateLastExchangeFuture(lastFut);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
new file mode 100644
index 0000000..e90b7af
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    private static int TEST_SIZE = 1_024_000;
+
+    /** cache name. */
+    protected static String CACHE_NAME_DHT = "cache";
+
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        cacheCfg.setName(CACHE_NAME_DHT);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg.setRebalanceBatchSize(1024);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        //cacheCfg.setRebalanceTimeout(1000000);
+        cacheCfg.setBackups(1);
+
+        iCfg.setCacheConfiguration(cacheCfg);
+        return iCfg;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testMassiveRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i);
+            }
+        }
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        // startGrid(1);
+
+        startGrid(2);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        stopGrid(0);
+
+       // Thread.sleep(10000);
+
+       // stopGrid(1);
+
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert grid(2).cachex(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+        }
+
+        log.info("Spend " + spend + " seconds to preload entries.");
+
+        stopAllGrids();
+    }
+}
\ No newline at end of file