You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/14 18:38:28 UTC

[34/34] incubator-ignite git commit: ignite-1093

ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64319443
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64319443
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64319443

Branch: refs/heads/ignite-1093
Commit: 64319443ab55aa4a0fc4c56182c774dec8446d48
Parents: 50d32b3
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 14 16:29:31 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 14 16:29:31 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  27 ++
 .../communication/GridIoMessageFactory.java     |   7 +-
 .../processors/cache/GridCacheIoManager.java    |   8 +
 .../dht/preloader/GridDhtPartitionDemander.java | 156 ++++---
 .../dht/preloader/GridDhtPartitionSupplier.java |  25 +-
 .../GridDhtPartitionSupplyMessageV2.java        | 423 +++++++++++++++++++
 .../GridCacheMassiveRebalancingSelfTest.java    | 210 ---------
 ...ridCacheMassiveRebalancingAsyncSelfTest.java |  37 ++
 ...GridCacheMassiveRebalancingSyncSelfTest.java | 252 +++++++++++
 9 files changed, 864 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3ad0f01..a19e136 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -57,6 +57,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default rebalance timeout (ms).*/
     public static final long DFLT_REBALANCE_TIMEOUT = 10000;
 
+    /** Default rebalance batches count. */
+    public static final long DFLT_REBALANCE_BATCHES_COUNT = 3;
+
     /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
     public static final long DFLT_REBALANCE_THROTTLE = 0;
 
@@ -240,6 +243,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Off-heap memory size. */
     private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
 
+    /** Rebalance batches count. */
+    private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
     /** */
     private boolean swapEnabled = DFLT_SWAP_ENABLED;
 
@@ -1751,6 +1757,27 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Gets number of batches generated by supply node at rebalancing start.
+     *
+     * @return
+     */
+    public long getRebalanceBatchesCount() {
+        return rebalanceBatchesCount;
+    }
+
+    /**
+     * Sets number of batches generated by supply node at rebalancing start.
+     *
+     * @param rebalanceBatchesCnt batches count.
+     */
+    public void setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+        this.rebalanceBatchesCount = rebalanceBatchesCnt;
+    }
+
+    /**
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7fe8da8..7ddbfb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -600,7 +600,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..112] - this
+            case 113:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
+            // [-3..113] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..da55f7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -503,6 +503,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 113: {
+                GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+                U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 16f7a61..262ccb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -72,9 +72,6 @@ public class GridDhtPartitionDemander {
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
-    /** Assignments. */
-    private volatile GridDhtPreloaderAssignments assigns;
-
     /**
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
@@ -95,8 +92,8 @@ public class GridDhtPartitionDemander {
             for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
                 final int idx = cnt;
 
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) {
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
                         enterBusy();
 
                         try {
@@ -110,7 +107,7 @@ public class GridDhtPartitionDemander {
             }
         }
 
-        syncFut = new SyncFuture();
+        syncFut = new SyncFuture(null);
 
         if (!enabled)
             // Calling onDone() immediately since preloading is disabled.
@@ -282,13 +279,15 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             assert assigns != null;
 
-            AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = assigns.topologyVersion();
 
-            if (this.assigns != null) {
+            if (syncFut.isInited()) {
                 syncFut.get();
 
-                syncFut = new SyncFuture();
+                syncFut = new SyncFuture(assigns);
             }
+            else
+                syncFut.init(assigns);
 
             if (assigns.isEmpty() || topologyChanged(topVer)) {
                 syncFut.onDone();
@@ -296,28 +295,30 @@ public class GridDhtPartitionDemander {
                 return;
             }
 
-            this.assigns = assigns;
-
             for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
                 GridDhtPartitionDemandMessage d = e.getValue();
 
                 d.timeout(cctx.config().getRebalanceTimeout());
                 d.workerId(0);//old api support.
 
-                ClusterNode node = e.getKey();
+                final ClusterNode node = e.getKey();
 
                 final long start = U.currentTimeMillis();
 
                 final CacheConfiguration cfg = cctx.config();
 
+                final AffinityTopologyVersion top = d.topologyVersion();
+
                 if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
                     U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
-                        ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]");
-
-                    syncFut.listen(new CI1<Object>() {
-                        @Override public void apply(Object t) {
-                            U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode="
-                                + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+                        ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+                    syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                        @Override public void apply(IgniteInternalFuture<Boolean> t) {
+                            Boolean cancelled = ((SyncFuture)t).cancelled();
+                            U.log(log, (cancelled ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
+                                + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
+                                ", time=" + (U.currentTimeMillis() - start) + " ms]");
                         }
                     });
                 }
@@ -394,7 +395,7 @@ public class GridDhtPartitionDemander {
      * @param c Partitions.
      * @return String representation of partitions list.
      */
-    private String partitionsList(Collection<Integer> c){
+    private String partitionsList(Collection<Integer> c) {
         LinkedList<Integer> s = new LinkedList<>(c);
 
         Collections.sort(s);
@@ -446,21 +447,19 @@ public class GridDhtPartitionDemander {
     private void handleSupplyMessage(
         int idx,
         final UUID id,
-        final GridDhtPartitionSupplyMessage supply) {
-        ClusterNode node = cctx.node(id);
-
-        assert node != null;
-
-        GridDhtPartitionDemandMessage d = assigns.get(node);
-
-        AffinityTopologyVersion topVer = d.topologyVersion();
+        final GridDhtPartitionSupplyMessageV2 supply) {
+        AffinityTopologyVersion topVer = supply.topologyVersion();
 
         if (topologyChanged(topVer)) {
-            syncFut.cancel(id);
+            syncFut.onCancel(id, topVer);
 
             return;
         }
 
+        ClusterNode node = cctx.node(id);
+
+        assert node != null;
+
         if (log.isDebugEnabled())
             log.debug("Received supply message: " + supply);
 
@@ -469,15 +468,13 @@ public class GridDhtPartitionDemander {
             if (log.isDebugEnabled())
                 log.debug("Class got undeployed during preloading: " + supply.classError());
 
-            syncFut.cancel(id);
+            syncFut.onCancel(id, topVer);
 
             return;
         }
 
         final GridDhtPartitionTopology top = cctx.dht().topology();
 
-        GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture();
-
         try {
 
             // Preload.
@@ -524,14 +521,10 @@ public class GridDhtPartitionDemander {
                             if (last) {
                                 top.own(part);
 
-                                syncFut.onPartitionDone(id, p);
+                                syncFut.onPartitionDone(id, p, topVer);
 
                                 if (log.isDebugEnabled())
                                     log.debug("Finished rebalancing partition: " + part);
-
-                                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                        exchFut.discoveryEvent());
                             }
                         }
                         finally {
@@ -540,14 +533,14 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        syncFut.onPartitionDone(id, p);
+                        syncFut.onPartitionDone(id, p, topVer);
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
                     }
                 }
                 else {
-                    syncFut.onPartitionDone(id, p);
+                    syncFut.onPartitionDone(id, p, topVer);
 
                     if (log.isDebugEnabled())
                         log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -557,35 +550,40 @@ public class GridDhtPartitionDemander {
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed())
                 if (cctx.affinity().localNode(miss, topVer))
-                    syncFut.onMissedPartition(id, miss);
+                    syncFut.onMissedPartition(id, miss, topVer);
 
             for (Integer miss : supply.missed())
-                syncFut.onPartitionDone(id, miss);
+                syncFut.onPartitionDone(id, miss, topVer);
 
             if (!syncFut.isDone()) {
 
-                // Create copy.
-                GridDhtPartitionDemandMessage nextD =
-                    new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+                GridDhtPartitionDemandMessage d = syncFut.getDemandMessage(topVer, node);
+
+                if (d != null) {
+
+                    // Create copy.
+                    GridDhtPartitionDemandMessage nextD =
+                        new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                nextD.topic(topic(idx, cctx.cacheId()));
+                    nextD.topic(topic(idx, cctx.cacheId()));
 
-                // Send demand message.
-                cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
-                    nextD, cctx.ioPolicy(), d.timeout());
+                    // Send demand message.
+                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+                        nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                }
             }
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
                 log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
                     ", msg=" + e.getMessage() + ']');
-            syncFut.cancel(id);
+            syncFut.onCancel(id, topVer);
         }
         catch (IgniteCheckedException ex) {
             U.error(log, "Failed to receive partitions from node (rebalancing will not " +
-                "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+                "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
 
-            syncFut.cancel(id);
+            syncFut.onCancel(id, topVer);
         }
     }
 
@@ -687,7 +685,7 @@ public class GridDhtPartitionDemander {
     /**
      *
      */
-    private class SyncFuture extends GridFutureAdapter<Object> {
+    public class SyncFuture extends GridFutureAdapter<Boolean> {
         /** */
         private static final long serialVersionUID = 1L;
 
@@ -695,32 +693,74 @@ public class GridDhtPartitionDemander {
 
         private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
 
-        public void append(UUID nodeId, Collection<Integer> parts) {
+        /** Assignments. */
+        private volatile GridDhtPreloaderAssignments assigns;
+
+        private volatile boolean cancelled = false;
+
+        SyncFuture(GridDhtPreloaderAssignments assigns) {
+            this.assigns = assigns;
+        }
+
+        public AffinityTopologyVersion topologyVersion() {
+            return assigns != null ? assigns.topologyVersion() : null;
+        }
+
+        void init(
+            GridDhtPreloaderAssignments assigns) {
+            this.assigns = assigns;
+        }
+
+        boolean isInited() {
+            return assigns != null;
+        }
+
+        void append(UUID nodeId, Collection<Integer> parts) {
             remaining.put(nodeId, parts);
 
             missed.put(nodeId, new GridConcurrentHashSet<Integer>());
         }
 
-        void cancel(UUID nodeId) {
-            if (isDone())
+        GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) {
+            if (!topVer.equals(assigns.topologyVersion()))
+                return null;
+
+            return assigns.get(node);
+        }
+
+        void onCancel(UUID nodeId, AffinityTopologyVersion topVer) {
+            if (isDone() || !topVer.equals(assigns.topologyVersion()))
                 return;
 
             remaining.remove(nodeId);
 
+            cancelled = true;
+
             checkIsDone();
         }
 
-        void onMissedPartition(UUID nodeId, int p) {
+        boolean cancelled() {
+            return cancelled;
+        }
+
+        void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
+            if (isDone() || !topVer.equals(assigns.topologyVersion()))
+                return;
+
             if (missed.get(nodeId) == null)
                 missed.put(nodeId, new GridConcurrentHashSet<Integer>());
 
             missed.get(nodeId).add(p);
         }
 
-        void onPartitionDone(UUID nodeId, int p) {
-            if (isDone())
+        void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) {
+            if (isDone() || !topVer.equals(assigns.topologyVersion()))
                 return;
 
+            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+                    assigns.exchangeFuture().discoveryEvent());
+
             Collection<Integer> parts = remaining.get(nodeId);
 
             parts.remove(p);
@@ -758,7 +798,7 @@ public class GridDhtPartitionDemander {
 
                 cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
 
-                onDone();
+                onDone(cancelled);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b948fbd..c496f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -170,8 +170,8 @@ class GridDhtPartitionSupplier {
         if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
             return;
 
-        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-            d.updateSequence(), cctx.cacheId());
+        GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+            d.updateSequence(), cctx.cacheId(), d.topologyVersion());
 
         long preloadThrottle = cctx.config().getRebalanceThrottle();
 
@@ -180,12 +180,13 @@ class GridDhtPartitionSupplier {
         T2<UUID, Object> scId = new T2<>(id, d.topic());
 
         try {
-            SupplyContext sctx = scMap.remove(scId);
-
             if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
                 doneMap.remove(scId);
+                scMap.remove(scId);
             }
 
+            SupplyContext sctx = scMap.remove(scId);
+
             if (doneMap.get(scId) != null)
                 return;
 
@@ -195,7 +196,7 @@ class GridDhtPartitionSupplier {
 
             boolean newReq = true;
 
-            long maxBatchesCnt = 3;//Todo: param
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
 
             if (sctx != null) {
                 phase = sctx.phase;
@@ -273,8 +274,8 @@ class GridDhtPartitionSupplier {
                                     return;
                                 }
                                 else {
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
                                 }
                             }
 
@@ -340,8 +341,8 @@ class GridDhtPartitionSupplier {
                                             return;
                                         }
                                         else {
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                                cctx.cacheId());
+                                            s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId(), d.topologyVersion());
                                         }
                                     }
 
@@ -443,8 +444,8 @@ class GridDhtPartitionSupplier {
                                     return;
                                 }
                                 else {
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
+                                    s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId(), d.topologyVersion());
                                 }
                             }
 
@@ -491,7 +492,7 @@ class GridDhtPartitionSupplier {
      * @return {@code True} if message was sent, {@code false} if recipient left grid.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s)
         throws IgniteCheckedException {
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..93d0db6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,423 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Worker ID. */
+    private int workerId = -1;
+
+    /** Update sequence. */
+    private long updateSeq;
+
+    /** Acknowledgement flag. */
+    private boolean ack;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Partitions that have been fully sent. */
+    @GridDirectCollection(int.class)
+    private Collection<Integer> last;
+
+    /** Partitions which were not found. */
+    @GridToStringInclude
+    @GridDirectCollection(int.class)
+    private Collection<Integer> missed;
+
+    /** Entries. */
+    @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+    private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+    /** Message size. */
+    @GridDirectTransient
+    private int msgSize;
+
+    /**
+     * @param workerId Worker ID.
+     * @param updateSeq Update sequence for this node.
+     * @param cacheId Cache ID.
+     */
+    GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+        assert workerId >= 0;
+        assert updateSeq > 0;
+
+        this.cacheId = cacheId;
+        this.updateSeq = updateSeq;
+        this.workerId = workerId;
+        this.topVer = topVer;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionSupplyMessageV2() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean ignoreClassErrors() {
+        return true;
+    }
+
+    /**
+     * @return Worker ID.
+     */
+    int workerId() {
+        return workerId;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * Marks this message for acknowledgment.
+     */
+    void markAck() {
+        ack = true;
+    }
+
+    /**
+     * @return Acknowledgement flag.
+     */
+    boolean ack() {
+        return ack;
+    }
+
+    /**
+     * @return Topology version for which demand message is sent.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Flag to indicate last message for partition.
+     */
+    Collection<Integer> last() {
+        return last == null ? Collections.<Integer>emptySet() : last;
+    }
+
+    /**
+     * @param p Partition which was fully sent.
+     */
+    void last(int p) {
+        if (last == null)
+            last = new HashSet<>();
+
+        if (last.add(p)) {
+            msgSize += 4;
+
+            // If partition is empty, we need to add it.
+            if (!infos.containsKey(p)) {
+                CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+                infoCol.init();
+
+                infos.put(p, infoCol);
+            }
+        }
+    }
+
+    /**
+     * @param p Missed partition.
+     */
+    void missed(int p) {
+        if (missed == null)
+            missed = new HashSet<>();
+
+        if (missed.add(p))
+            msgSize += 4;
+    }
+
+    /**
+     * @return Missed partitions.
+     */
+    Collection<Integer> missed() {
+        return missed == null ? Collections.<Integer>emptySet() : missed;
+    }
+
+    /**
+     * @return Entries.
+     */
+    Map<Integer, CacheEntryInfoCollection> infos() {
+        return infos;
+    }
+
+    /**
+     * @return Message size.
+     */
+    int messageSize() {
+        return msgSize;
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /**
+     * @param p Partition.
+     * @param info Entry to add.
+     * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+        assert info != null;
+        assert (info.key() != null || info.keyBytes() != null);
+        assert info.value() != null;
+
+        // Need to call this method to initialize info properly.
+        marshalInfo(info, ctx);
+
+        msgSize += info.marshalledSize(ctx);
+
+        CacheEntryInfoCollection infoCol = infos.get(p);
+
+        if (infoCol == null) {
+            msgSize += 4;
+
+            infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+            infoCol.init();
+        }
+
+        infoCol.add(info);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+        for (CacheEntryInfoCollection col : infos().values()) {
+            List<GridCacheEntryInfo>  entries = col.infos();
+
+            for (int i = 0; i < entries.size(); i++)
+                entries.get(i).unmarshal(cacheCtx, ldr);
+        }
+    }
+
+    /**
+     * @return Number of entries in message.
+     */
+    public int size() {
+        return infos.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("ack", ack))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeInt("workerId", workerId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                ack = reader.readBoolean("ack");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                workerId = reader.readInt("workerId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 113;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+            "size", size(),
+            "parts", infos.keySet(),
+            "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
deleted file mode 100644
index 0771509..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    private static int TEST_SIZE = 1_024_000;
-
-    /** cache name. */
-    protected static String CACHE_NAME_DHT = "cache";
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return Long.MAX_VALUE;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
-        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
-        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
-        if (getTestGridName(3).equals(gridName))
-            iCfg.setClientMode(true);
-
-        cacheCfg.setName(CACHE_NAME_DHT);
-        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
-        //cacheCfg.setRebalanceBatchSize(1024);
-        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setRebalanceThreadPoolSize(4);
-        //cacheCfg.setRebalanceTimeout(1000000);
-        cacheCfg.setBackups(1);
-
-        iCfg.setCacheConfiguration(cacheCfg);
-        return iCfg;
-    }
-
-    /**
-     * @param ignite Ignite.
-     */
-    private void generateData(Ignite ignite) {
-        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
-            for (int i = 0; i < TEST_SIZE; i++) {
-                if (i % 1_000_000 == 0)
-                    log.info("Prepared " + i / 1_000_000 + "m entries.");
-
-                stmr.addData(i, i);
-            }
-        }
-    }
-
-    /**
-     * @param ignite Ignite.
-     * @throws IgniteCheckedException
-     */
-    private void checkData(Ignite ignite) throws IgniteCheckedException {
-        for (int i = 0; i < TEST_SIZE; i++) {
-            if (i % 1_000_000 == 0)
-                log.info("Checked " + i / 1_000_000 + "m entries.");
-
-            assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
-        }
-    }
-
-    /**
-     * @throws Exception
-     */
-    public void testMassiveRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
-
-        generateData(ignite);
-
-        log.info("Preloading started.");
-
-        long start = System.currentTimeMillis();
-
-        startGrid(1);
-
-        startGrid(2);
-
-        long spend = (System.currentTimeMillis() - start) / 1000;
-
-        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-        IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
-        stopGrid(0);
-
-        //TODO: refactor to get futures by topology
-        while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
-            f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
-            U.sleep(100);
-
-        ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-
-        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
-        stopGrid(1);
-
-        while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
-            U.sleep(100);
-
-        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-
-        checkData(grid(2));
-
-        log.info("Spend " + spend + " seconds to preload entries.");
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception
-     */
-    public void testOpPerSecRebalancingTest() throws Exception {
-        startGrid(0);
-
-        final AtomicBoolean cancelled = new AtomicBoolean(false);
-
-        generateData(grid(0));
-
-        startGrid(1);
-        startGrid(2);
-        startGrid(3);
-
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-
-                long spend = 0;
-
-                long ops = 0;
-
-                while (!cancelled.get()) {
-                    try {
-                        long start = System.currentTimeMillis();
-
-                        int size = 1000;
-
-                        for (int i = 0; i < size; i++)
-                            grid(3).cachex(CACHE_NAME_DHT).remove(i);
-
-                        for (int i = 0; i < size; i++)
-                            grid(3).cachex(CACHE_NAME_DHT).put(i, i);
-
-                        spend += System.currentTimeMillis() - start;
-
-                        ops += size * 2;
-                    }
-                    catch (IgniteCheckedException e) {
-                        e.printStackTrace();
-                    }
-
-                    log.info("Ops. per ms: " + ops / spend);
-                }
-            }
-        });
-        t.start();
-
-        stopGrid(0);
-        startGrid(0);
-
-        stopGrid(0);
-        startGrid(0);
-
-        stopGrid(0);
-        startGrid(0);
-
-        cancelled.set(true);
-        t.join();
-
-        checkData(grid(3));
-
-        //stopAllGrids();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..8bcd6d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        return iCfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..cd12954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -0,0 +1,252 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    private static int TEST_SIZE = 1_024_000;
+
+    /** cache name. */
+    protected static String CACHE_NAME_DHT = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(3).equals(gridName))
+            iCfg.setClientMode(true);
+
+        cacheCfg.setName(CACHE_NAME_DHT);
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg.setRebalanceBatchSize(1024);
+        //cacheCfg.setRebalanceBatchesCount(1);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setRebalanceThreadPoolSize(4);
+        //cacheCfg.setRebalanceTimeout(1000000);
+        cacheCfg.setBackups(1);
+
+        iCfg.setCacheConfiguration(cacheCfg);
+        return iCfg;
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite) {
+        try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i);
+            }
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException
+     */
+    protected void checkData(Ignite ignite) throws IgniteCheckedException {
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testSimpleRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+
+        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        f1.get();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        stopGrid(0);
+
+        checkData(grid(1));
+
+        log.info("Spend " + spend + " seconds to preload entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testComplexRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+        startGrid(2);
+
+        IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        f2.get();
+
+        IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) {
+            U.sleep(100);
+
+            f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        }
+        f1.get();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        stopGrid(0);
+
+        //TODO: refactor to get futures by topology
+        while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
+            f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+            U.sleep(100);
+
+        ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+        f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        stopGrid(1);
+
+        while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+            U.sleep(100);
+
+        ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+        checkData(grid(2));
+
+        log.info("Spend " + spend + " seconds to preload entries.");
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void _testOpPerSecRebalancingTest() throws Exception {
+        startGrid(0);
+
+        final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+        generateData(grid(0));
+
+        startGrid(1);
+        startGrid(2);
+        startGrid(3);
+
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+
+                long spend = 0;
+
+                long ops = 0;
+
+                while (!cancelled.get()) {
+                    try {
+                        long start = System.currentTimeMillis();
+
+                        int size = 1000;
+
+                        for (int i = 0; i < size; i++)
+                            grid(3).cachex(CACHE_NAME_DHT).remove(i);
+
+                        for (int i = 0; i < size; i++)
+                            grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+
+                        spend += System.currentTimeMillis() - start;
+
+                        ops += size * 2;
+                    }
+                    catch (IgniteCheckedException e) {
+                        e.printStackTrace();
+                    }
+
+                    log.info("Ops. per ms: " + ops / spend);
+                }
+            }
+        });
+        t.start();
+
+        stopGrid(0);
+        startGrid(0);
+
+        stopGrid(0);
+        startGrid(0);
+
+        stopGrid(0);
+        startGrid(0);
+
+        cancelled.set(true);
+        t.join();
+
+        checkData(grid(3));
+
+        //stopAllGrids();
+    }
+}
\ No newline at end of file