You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/25 10:57:47 UTC

[40/50] ignite git commit: ignite-1093

ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c02608b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c02608b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c02608b8

Branch: refs/heads/ignite-1093
Commit: c02608b81dab00321d5ac06ac703739fb4676557
Parents: b7ee4cc
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 20:02:35 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 20:02:35 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  8 +--
 .../configuration/IgniteConfiguration.java      | 28 ++++++++
 .../GridCachePartitionExchangeManager.java      | 58 ++++++++++++++-
 .../processors/cache/GridCachePreloader.java    | 18 +++++
 .../cache/GridCachePreloaderAdapter.java        | 10 +++
 .../dht/preloader/GridDhtPartitionDemander.java | 76 +++-----------------
 .../dht/preloader/GridDhtPartitionSupplier.java | 64 +----------------
 .../dht/preloader/GridDhtPreloader.java         | 14 +++-
 ...GridCacheMassiveRebalancingSyncSelfTest.java |  2 +-
 9 files changed, 141 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0699124..deb3f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1260,8 +1260,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
-     * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
-     * may create more threads for rebalancing than specified here (but never less threads).
+     * Gets count of threads used at rebalancing.
+     * Limited by {@link IgniteConfiguration#maxRebalanceThreadPoolSize}
      * <p>
      * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
      *
@@ -1272,8 +1272,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
-     * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
-     * for rebalancing than specified here (but never less threads).
+     * Sets count of threads used at rebalancing.
+     * Limited by {@link IgniteConfiguration#maxRebalanceThreadPoolSize}
      *
      * @param rebalancePoolSize Size of rebalancing thread pool.
      * @return {@code this} for chaining.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 546c382..d09ac0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -141,6 +141,9 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
+    /** Default limit of threads used at rebalance. */
+    public static final int DFLT_MAX_REBALANCE_THREAD_POOL_SIZE = 16;
+
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
 
@@ -346,6 +349,9 @@ public class IgniteConfiguration {
     /** Client mode flag. */
     private Boolean clientMode;
 
+    /** Maximum rebalance thread pool size. */
+    private int maxRebalanceThreadPoolSize = DFLT_MAX_REBALANCE_THREAD_POOL_SIZE;
+
     /** Transactions configuration. */
     private TransactionConfiguration txCfg = new TransactionConfiguration();
 
@@ -1319,6 +1325,28 @@ public class IgniteConfiguration {
         return this;
     }
 
+
+    /**
+     * Gets count of available rebalancing threads.
+     * See {@link CacheConfiguration#setRebalanceThreadPoolSize} for details.
+     * @return count.
+     */
+    public int getMaxRebalanceThreadPoolSize(){
+        return maxRebalanceThreadPoolSize;
+    }
+
+    /**
+     * Sets count of available rebalancing threads.
+     * See {@link CacheConfiguration#setRebalanceThreadPoolSize} for details.
+     * @param size Size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setMaxRebalanceThreadPoolSize(int size){
+        this.maxRebalanceThreadPoolSize = size;
+
+        return this;
+    }
+
     /**
      * Returns a collection of life-cycle beans. These beans will be automatically
      * notified of grid life-cycle events. Use life-cycle beans whenever you

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e00d3b7..b555584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,7 +48,8 @@ import java.util.concurrent.locks.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
 
@@ -274,6 +275,40 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
+        if (!cctx.kernalContext().clientNode()) {
+
+            for (int cnt = 0; cnt < cctx.gridConfig().getMaxRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+                        enterBusy();
+
+                        try {
+
+                            cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+                cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
         new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
 
         onDiscoveryEvent(cctx.localNodeId(), fut);
@@ -336,6 +371,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
     }
 
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object demanderTopic(int idx) {
+        return TOPIC_CACHE.topic("Demander", idx);
+    }
+
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object supplierTopic(int idx) {
+        return TOPIC_CACHE.topic("Supplier", idx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -360,6 +411,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(err);
 
+        for (int cnt = 0; cnt <  cctx.gridConfig().getMaxRebalanceThreadPoolSize(); cnt++) {
+            cctx.io().removeOrderedHandler(demanderTopic(cnt));
+            cctx.io().removeOrderedHandler(supplierTopic(cnt));
+        }
+
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1e915eb..105bec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -132,4 +132,22 @@ public interface GridCachePreloader {
      * Unwinds undeploys.
      */
     public void unwindUndeploys();
+
+
+    /**
+     * Handles Supply message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param s Supply message.
+     */
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+    /**
+     * Handles Demand message.
+     *
+     * @param id Node Id.
+     * @param d Demand message.
+     */
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 68deb2e..527e5bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -117,6 +117,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
         return new GridFinishedFuture<>();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c30630..43c5484 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -39,10 +39,8 @@ import org.jsr166.*;
 
 import java.util.*;
 import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
 import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 
@@ -57,9 +55,6 @@ public class GridDhtPartitionDemander {
     /** */
     private final IgniteLogger log;
 
-    /** */
-    private final ReadWriteLock busyLock;
-
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
@@ -75,39 +70,16 @@ public class GridDhtPartitionDemander {
 
     /**
      * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
      */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
-        assert busyLock != null;
 
         this.cctx = cctx;
-        this.busyLock = busyLock;
 
         log = cctx.logger(getClass());
 
         boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        if (enabled) {
-
-            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
-                final int idx = cnt;
-
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
-                    @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
-                        enterBusy();
-
-                        try {
-                            handleSupplyMessage(idx, id, m);
-                        }
-                        finally {
-                            leaveBusy();
-                        }
-                    }
-                });
-            }
-        }
-
         syncFut = new SyncFuture(null);
 
         if (!enabled)
@@ -169,11 +141,6 @@ public class GridDhtPartitionDemander {
      *
      */
     void stop() {
-        if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) {
-            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
-                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
-        }
-
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -221,27 +188,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     * @param idx
-     * @return topic
-     */
-    static Object topic(int idx, int cacheId) {
-        return TOPIC_CACHE.topic("Demander", cacheId, idx);
-    }
-
-    /**
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(AffinityTopologyVersion topVer) {
@@ -249,13 +195,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
      * @param type Type.
      * @param discoEvt Discovery event.
      */
@@ -339,7 +278,8 @@ public class GridDhtPartitionDemander {
 
                 syncFut.append(node.id(), remainings);
 
-                int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
+                int lsnrCnt = Math.min(cctx.config().getRebalanceThreadPoolSize(),
+                    cctx.gridConfig().getMaxRebalanceThreadPoolSize());
 
                 List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
@@ -360,10 +300,10 @@ public class GridDhtPartitionDemander {
                         // Create copy.
                         GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
-                        initD.topic(topic(cnt, cctx.cacheId()));
+                        initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
 
                         try {
-                            cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+                            cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
                         }
                         catch (IgniteCheckedException ex) {
                             U.error(log, "Failed to send partition demand message to local node", ex);
@@ -454,7 +394,7 @@ public class GridDhtPartitionDemander {
      * @param id Node id.
      * @param supply Supply.
      */
-    private void handleSupplyMessage(
+    public void handleSupplyMessage(
         int idx,
         final UUID id,
         final GridDhtPartitionSupplyMessageV2 supply) {
@@ -575,10 +515,10 @@ public class GridDhtPartitionDemander {
                     GridDhtPartitionDemandMessage nextD =
                         new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
 
-                    nextD.topic(topic(idx, cctx.cacheId()));
+                    nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
 
                     // Send demand message.
-                    cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
                         nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 546e67b..347a394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -47,9 +47,6 @@ class GridDhtPartitionSupplier {
     private final IgniteLogger log;
 
     /** */
-    private final ReadWriteLock busyLock;
-
-    /** */
     private GridDhtPartitionTopology top;
 
     /** */
@@ -66,47 +63,17 @@ class GridDhtPartitionSupplier {
 
     /**
      * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
      */
-    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
-        assert busyLock != null;
 
         this.cctx = cctx;
-        this.busyLock = busyLock;
 
         log = cctx.logger(getClass());
 
         top = cctx.dht().topology();
 
-        if (!cctx.kernalContext().clientNode()) {
-            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                        if (!enterBusy())
-                            return;
-
-                        try {
-                            processMessage(m, id);
-                        }
-                        finally {
-                            leaveBusy();
-                        }
-                    }
-                });
-            }
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     * @param idx Index.
-     * @param id Node id.
-     * @return topic
-     */
-    static Object topic(int idx, int id) {
-        return TOPIC_CACHE.topic("Supplier", idx, id);
+         depEnabled = cctx.gridDeploy().enabled();
     }
 
     /**
@@ -120,11 +87,6 @@ class GridDhtPartitionSupplier {
      */
     void stop() {
         top = null;
-
-        if (!cctx.kernalContext().clientNode()) {
-            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
-                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
-        }
     }
 
     /**
@@ -137,30 +99,10 @@ class GridDhtPartitionSupplier {
     }
 
     /**
-     * @return {@code true} if entered to busy state.
-     */
-    private boolean enterBusy() {
-        if (busyLock.readLock().tryLock())
-            return true;
-
-        if (log.isDebugEnabled())
-            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
-        return false;
-    }
-
-    /**
-     *
-     */
-    private void leaveBusy() {
-        busyLock.readLock().unlock();
-    }
-
-    /**
      * @param d Demand message.
      * @param id Node uuid.
      */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
         assert d != null;
         assert id != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 7f99ebf..585566b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -162,8 +162,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplier = new GridDhtPartitionSupplier(cctx, busyLock);
-        demander = new GridDhtPartitionDemander(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx);
+        demander = new GridDhtPartitionDemander(cctx);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -349,6 +349,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+        demander.handleSupplyMessage(idx, id, s);
+    }
+
+    /** {@inheritDoc} */
+    public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d){
+        supplier.handleDemandMessage(id, d);
+    }
+
+    /** {@inheritDoc} */
     @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
         demander.addAssignments(assignments, forcePreload);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index 1182254..80c75f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,7 +39,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_120_000;
+    private static int TEST_SIZE = 1_000_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";