You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/25 10:57:47 UTC
[40/50] ignite git commit: ignite-1093
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c02608b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c02608b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c02608b8
Branch: refs/heads/ignite-1093
Commit: c02608b81dab00321d5ac06ac703739fb4676557
Parents: b7ee4cc
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 21 20:02:35 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 21 20:02:35 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 8 +--
.../configuration/IgniteConfiguration.java | 28 ++++++++
.../GridCachePartitionExchangeManager.java | 58 ++++++++++++++-
.../processors/cache/GridCachePreloader.java | 18 +++++
.../cache/GridCachePreloaderAdapter.java | 10 +++
.../dht/preloader/GridDhtPartitionDemander.java | 76 +++-----------------
.../dht/preloader/GridDhtPartitionSupplier.java | 64 +----------------
.../dht/preloader/GridDhtPreloader.java | 14 +++-
...GridCacheMassiveRebalancingSyncSelfTest.java | 2 +-
9 files changed, 141 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0699124..deb3f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -1260,8 +1260,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
- * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation
- * may create more threads for rebalancing than specified here (but never less threads).
+ * Gets count of threads used at rebalancing.
+ * Limited by {@link IgniteConfiguration#maxRebalanceThreadPoolSize}
* <p>
* Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
*
@@ -1272,8 +1272,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
- * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads
- * for rebalancing than specified here (but never less threads).
+ * Sets count of threads used at rebalancing.
+ * Limited by {@link IgniteConfiguration#maxRebalanceThreadPoolSize}
*
* @param rebalancePoolSize Size of rebalancing thread pool.
* @return {@code this} for chaining.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 546c382..d09ac0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -141,6 +141,9 @@ public class IgniteConfiguration {
/** Default keep alive time for public thread pool. */
public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
+ /** Default limit of threads used at rebalance. */
+ public static final int DFLT_MAX_REBALANCE_THREAD_POOL_SIZE = 16;
+
/** Default max queue capacity of public thread pool. */
public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE;
@@ -346,6 +349,9 @@ public class IgniteConfiguration {
/** Client mode flag. */
private Boolean clientMode;
+ /** Maximum rebalance thread pool size. */
+ private int maxRebalanceThreadPoolSize = DFLT_MAX_REBALANCE_THREAD_POOL_SIZE;
+
/** Transactions configuration. */
private TransactionConfiguration txCfg = new TransactionConfiguration();
@@ -1319,6 +1325,28 @@ public class IgniteConfiguration {
return this;
}
+
+ /**
+ * Gets count of available rebalancing threads.
+ * See {@link CacheConfiguration#setRebalanceThreadPoolSize} for details.
+ * @return count.
+ */
+ public int getMaxRebalanceThreadPoolSize(){
+ return maxRebalanceThreadPoolSize;
+ }
+
+ /**
+ * Sets count of available rebalancing threads.
+ * See {@link CacheConfiguration#setRebalanceThreadPoolSize} for details.
+ * @param size Size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setMaxRebalanceThreadPoolSize(int size){
+ this.maxRebalanceThreadPoolSize = size;
+
+ return this;
+ }
+
/**
* Returns a collection of life-cycle beans. These beans will be automatically
* notified of grid life-cycle events. Use life-cycle beans whenever you
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e00d3b7..b555584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -48,7 +48,8 @@ import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
@@ -274,6 +275,40 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
+ if (!cctx.kernalContext().clientNode()) {
+
+ for (int cnt = 0; cnt < cctx.gridConfig().getMaxRebalanceThreadPoolSize(); cnt++) {
+ final int idx = cnt;
+
+ cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+ @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
+ enterBusy();
+
+ try {
+
+ cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(idx, id, m);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ cctx.io().addOrderedHandler(supplierTopic(cnt), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ if (!enterBusy())
+ return;
+
+ try {
+ cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(id, m);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ }
+
new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
onDiscoveryEvent(cctx.localNodeId(), fut);
@@ -336,6 +371,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object demanderTopic(int idx) {
+ return TOPIC_CACHE.topic("Demander", idx);
+ }
+
+ /**
+ * @param idx
+ * @return topic
+ */
+ public static Object supplierTopic(int idx) {
+ return TOPIC_CACHE.topic("Supplier", idx);
+ }
+
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -360,6 +411,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(err);
+ for (int cnt = 0; cnt < cctx.gridConfig().getMaxRebalanceThreadPoolSize(); cnt++) {
+ cctx.io().removeOrderedHandler(demanderTopic(cnt));
+ cctx.io().removeOrderedHandler(supplierTopic(cnt));
+ }
+
U.cancel(exchWorker);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1e915eb..105bec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -132,4 +132,22 @@ public interface GridCachePreloader {
* Unwinds undeploys.
*/
public void unwindUndeploys();
+
+
+ /**
+ * Handles Supply message.
+ *
+ * @param idx Index.
+ * @param id Node Id.
+ * @param s Supply message.
+ */
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s);
+
+ /**
+ * Handles Demand message.
+ *
+ * @param id Node Id.
+ * @param d Demand message.
+ */
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 68deb2e..527e5bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -117,6 +117,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0c30630..43c5484 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -39,10 +39,8 @@ import org.jsr166.*;
import java.util.*;
import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.dr.GridDrType.*;
@@ -57,9 +55,6 @@ public class GridDhtPartitionDemander {
/** */
private final IgniteLogger log;
- /** */
- private final ReadWriteLock busyLock;
-
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
@@ -75,39 +70,16 @@ public class GridDhtPartitionDemander {
/**
* @param cctx Cache context.
- * @param busyLock Shutdown lock.
*/
- public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
assert cctx != null;
- assert busyLock != null;
this.cctx = cctx;
- this.busyLock = busyLock;
log = cctx.logger(getClass());
boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
- if (enabled) {
-
- for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
- final int idx = cnt;
-
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
- @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
- enterBusy();
-
- try {
- handleSupplyMessage(idx, id, m);
- }
- finally {
- leaveBusy();
- }
- }
- });
- }
- }
-
syncFut = new SyncFuture(null);
if (!enabled)
@@ -169,11 +141,6 @@ public class GridDhtPartitionDemander {
*
*/
void stop() {
- if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) {
- for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
- cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
- }
-
lastExchangeFut = null;
lastTimeoutObj.set(null);
@@ -221,27 +188,6 @@ public class GridDhtPartitionDemander {
}
/**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- * @param idx
- * @return topic
- */
- static Object topic(int idx, int cacheId) {
- return TOPIC_CACHE.topic("Demander", cacheId, idx);
- }
-
- /**
* @return {@code True} if topology changed.
*/
private boolean topologyChanged(AffinityTopologyVersion topVer) {
@@ -249,13 +195,6 @@ public class GridDhtPartitionDemander {
}
/**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
* @param type Type.
* @param discoEvt Discovery event.
*/
@@ -339,7 +278,8 @@ public class GridDhtPartitionDemander {
syncFut.append(node.id(), remainings);
- int lsnrCnt = cctx.config().getRebalanceThreadPoolSize();
+ int lsnrCnt = Math.min(cctx.config().getRebalanceThreadPoolSize(),
+ cctx.gridConfig().getMaxRebalanceThreadPoolSize());
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
@@ -360,10 +300,10 @@ public class GridDhtPartitionDemander {
// Create copy.
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
- initD.topic(topic(cnt, cctx.cacheId()));
+ initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
try {
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout());
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to send partition demand message to local node", ex);
@@ -454,7 +394,7 @@ public class GridDhtPartitionDemander {
* @param id Node id.
* @param supply Supply.
*/
- private void handleSupplyMessage(
+ public void handleSupplyMessage(
int idx,
final UUID id,
final GridDhtPartitionSupplyMessageV2 supply) {
@@ -575,10 +515,10 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage nextD =
new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(topic(idx, cctx.cacheId()));
+ nextD.topic(GridCachePartitionExchangeManager.demanderTopic(idx));
// Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(idx),
nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 546e67b..347a394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -47,9 +47,6 @@ class GridDhtPartitionSupplier {
private final IgniteLogger log;
/** */
- private final ReadWriteLock busyLock;
-
- /** */
private GridDhtPartitionTopology top;
/** */
@@ -66,47 +63,17 @@ class GridDhtPartitionSupplier {
/**
* @param cctx Cache context.
- * @param busyLock Shutdown lock.
*/
- GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+ GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
assert cctx != null;
- assert busyLock != null;
this.cctx = cctx;
- this.busyLock = busyLock;
log = cctx.logger(getClass());
top = cctx.dht().topology();
- if (!cctx.kernalContext().clientNode()) {
- for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
- @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
- if (!enterBusy())
- return;
-
- try {
- processMessage(m, id);
- }
- finally {
- leaveBusy();
- }
- }
- });
- }
- }
-
- depEnabled = cctx.gridDeploy().enabled();
- }
-
- /**
- * @param idx Index.
- * @param id Node id.
- * @return topic
- */
- static Object topic(int idx, int id) {
- return TOPIC_CACHE.topic("Supplier", idx, id);
+ depEnabled = cctx.gridDeploy().enabled();
}
/**
@@ -120,11 +87,6 @@ class GridDhtPartitionSupplier {
*/
void stop() {
top = null;
-
- if (!cctx.kernalContext().clientNode()) {
- for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
- cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
- }
}
/**
@@ -137,30 +99,10 @@ class GridDhtPartitionSupplier {
}
/**
- * @return {@code true} if entered to busy state.
- */
- private boolean enterBusy() {
- if (busyLock.readLock().tryLock())
- return true;
-
- if (log.isDebugEnabled())
- log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
-
- return false;
- }
-
- /**
- *
- */
- private void leaveBusy() {
- busyLock.readLock().unlock();
- }
-
- /**
* @param d Demand message.
* @param id Node uuid.
*/
- private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d) {
assert d != null;
assert id != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 7f99ebf..585566b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -162,8 +162,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- supplier = new GridDhtPartitionSupplier(cctx, busyLock);
- demander = new GridDhtPartitionDemander(cctx, busyLock);
+ supplier = new GridDhtPartitionSupplier(cctx);
+ demander = new GridDhtPartitionDemander(cctx);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -349,6 +349,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
+ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+ demander.handleSupplyMessage(idx, id, s);
+ }
+
+ /** {@inheritDoc} */
+ public void handleDemandMessage(UUID id, GridDhtPartitionDemandMessage d){
+ supplier.handleDemandMessage(id, d);
+ }
+
+ /** {@inheritDoc} */
@Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException {
demander.addAssignments(assignments, forcePreload);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c02608b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index 1182254..80c75f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,7 +39,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT
/** */
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- private static int TEST_SIZE = 1_120_000;
+ private static int TEST_SIZE = 1_000_000;
/** cache name. */
protected static String CACHE_NAME_DHT = "cache";