You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/07/31 17:19:15 UTC
incubator-ignite git commit: ignite-1093 Parallel demandPool
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1093 [created] 5e3d8008c
ignite-1093 Parallel demandPool
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e3d8008
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e3d8008
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e3d8008
Branch: refs/heads/ignite-1093
Commit: 5e3d8008c3e6d92be0374aadb47312d6c2a9f49a
Parents: 5288b2d
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Fri Jul 31 18:17:25 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Fri Jul 31 18:17:25 2015 +0300
----------------------------------------------------------------------
.../preloader/GridDhtPartitionDemandPool.java | 410 +++++++++----------
.../preloader/GridDhtPartitionSupplyPool.java | 10 +-
.../dht/preloader/GridDhtPreloader.java | 5 -
.../GridCacheMassiveRebalancingSelfTest.java | 107 +++++
4 files changed, 302 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index a6e6c4d..50c5e90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.thread.*;
import org.jetbrains.annotations.*;
+import org.jsr166.*;
import java.util.*;
import java.util.concurrent.*;
@@ -187,16 +187,6 @@ public class GridDhtPartitionDemandPool {
}
/**
- * Wakes up demand workers when new exchange future was added.
- */
- void onExchangeFutureAdded() {
- synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers)
- w.addMessage(DUMMY_TOP);
- }
- }
-
- /**
* Force preload.
*/
void forcePreload() {
@@ -339,11 +329,8 @@ public class GridDhtPartitionDemandPool {
assert assigns != null;
synchronized (dmdWorkers) {
- for (DemandWorker w : dmdWorkers) {
+ for (DemandWorker w : dmdWorkers)
w.addAssignments(assigns);
-
- w.addMessage(DUMMY_TOP);
- }
}
}
else if (delay > 0) {
@@ -403,13 +390,6 @@ public class GridDhtPartitionDemandPool {
/** Partition-to-node assignments. */
private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
- /** Message queue. */
- private final LinkedBlockingDeque<SupplyMessage> msgQ =
- new LinkedBlockingDeque<>();
-
- /** Counter. */
- private long cntr;
-
/** Hide worker logger and use cache logger instead. */
private IgniteLogger log = GridDhtPartitionDemandPool.this.log;
@@ -444,23 +424,6 @@ public class GridDhtPartitionDemandPool {
}
/**
- * @param msg Message.
- */
- private void addMessage(SupplyMessage msg) {
- if (!enterBusy())
- return;
-
- try {
- assert dummyTopology(msg) || msg.supply().workerId() == id;
-
- msgQ.offer(msg);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
* @param timeout Timed out value.
*/
private void growTimeout(long timeout) {
@@ -558,14 +521,6 @@ public class GridDhtPartitionDemandPool {
}
/**
- * @param idx Unique index for this topic.
- * @return Topic for partition.
- */
- public Object topic(long idx) {
- return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
- }
-
- /**
* @param node Node to demand from.
* @param topVer Topology version.
* @param d Demand message.
@@ -576,243 +531,258 @@ public class GridDhtPartitionDemandPool {
* @throws IgniteCheckedException If failed to send message.
*/
private Set<Integer> demandFromNode(
- ClusterNode node,
+ final ClusterNode node,
final AffinityTopologyVersion topVer,
- GridDhtPartitionDemandMessage d,
- GridDhtPartitionsExchangeFuture exchFut
+ final GridDhtPartitionDemandMessage d,
+ final GridDhtPartitionsExchangeFuture exchFut
) throws InterruptedException, IgniteCheckedException {
- GridDhtPartitionTopology top = cctx.dht().topology();
+ final GridDhtPartitionTopology top = cctx.dht().topology();
- cntr++;
+ long timeout = GridDhtPartitionDemandPool.this.timeout.get();
- d.topic(topic(cntr));
+ d.timeout(timeout);
d.workerId(id);
- Set<Integer> missed = new HashSet<>();
+ final Set<Integer> missed = new HashSet<>();
- // Get the same collection that will be sent in the message.
- Collection<Integer> remaining = d.partitions();
+ final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>();
- // Drain queue before processing a new node.
- drainQueue();
+ for (int p : d.partitions())
+ remaining.put(p, false);
if (isCancelled() || topologyChanged())
return missed;
- cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
+ for (int p : d.partitions()) {
+ cctx.io().addOrderedHandler(topic(p, node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+ handleSupplyMessage(new SupplyMessage(nodeId, msg), node, topVer, top, remaining,
+ exchFut, missed, d);
+ }
+ });
+ }
try {
- boolean retry;
-
- // DoWhile.
- // =======
- do {
- retry = false;
-
- // Create copy.
- d = new GridDhtPartitionDemandMessage(d, remaining);
-
- long timeout = GridDhtPartitionDemandPool.this.timeout.get();
-
- d.timeout(timeout);
+ Iterator<Integer> it = remaining.keySet().iterator();
- if (log.isDebugEnabled())
- log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
- // Send demand message.
- cctx.io().send(node, d, cctx.ioPolicy());
-
- // While.
- // =====
- while (!isCancelled() && !topologyChanged()) {
- SupplyMessage s = poll(msgQ, timeout, this);
-
- // If timed out.
- if (s == null) {
- if (msgQ.isEmpty()) { // Safety check.
- U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
- " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
- " configuration properties).");
+ final int maxC = Runtime.getRuntime().availableProcessors() / 2; //Todo: make param
- growTimeout(timeout);
+ int sent = 0;
- // Ordered listener was removed if timeout expired.
- cctx.io().removeOrderedHandler(d.topic());
+ while (sent < maxC && it.hasNext()) {
+ int p = it.next();
- // Must create copy to be able to work with IO manager thread local caches.
- d = new GridDhtPartitionDemandMessage(d, remaining);
+ Collection<Integer> ps = new ArrayList<>(1);
- // Create new topic.
- d.topic(topic(++cntr));
+ boolean res = remaining.replace(p, false, true);
- // Create new ordered listener.
- cctx.io().addOrderedHandler(d.topic(),
- new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId,
- GridDhtPartitionSupplyMessage msg) {
- addMessage(new SupplyMessage(nodeId, msg));
- }
- });
-
- // Resend message with larger timeout.
- retry = true;
-
- break; // While.
- }
- else
- continue; // While.
- }
+ assert res;
- // If topology changed.
- if (dummyTopology(s)) {
- if (topologyChanged())
- break; // While.
- else
- continue; // While.
- }
+ ps.add(p);
- // Check that message was received from expected node.
- if (!s.senderId().equals(node.id())) {
- U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
- ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, ps);
- continue; // While.
- }
+ initD.topic(topic(p, node.id()));
- if (log.isDebugEnabled())
- log.debug("Received supply message: " + s);
+ // Send initial demand message.
+ cctx.io().send(node, initD, cctx.ioPolicy());
- GridDhtPartitionSupplyMessage supply = s.supply();
+ sent++;
+ }
- // Check whether there were class loading errors on unmarshal
- if (supply.classError() != null) {
- if (log.isDebugEnabled())
- log.debug("Class got undeployed during preloading: " + supply.classError());
+ do {
+ U.sleep(1000);//Todo: improve
+ }
+ while (!isCancelled() && !topologyChanged() && !remaining.isEmpty());
- retry = true;
+ return missed;
+ }
+ finally {
+ for (int p : d.partitions())
+ cctx.io().removeOrderedHandler(topic(p, node.id()));
+ }
+ }
- // Quit preloading.
- break;
- }
+ /**
+ * @param p Partition.
+ * @param id remote node id.
+ * @return topic
+ */
+ private Object topic(int p, UUID id) {
+ return TOPIC_CACHE.topic("Preloading", id, cctx.cacheId(), p);
+ }
- // Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
- int p = e.getKey();
+ /**
+ * @param s Supply message.
+ * @param node Node.
+ * @param topVer Topology version.
+ * @param top Topology.
+ * @param remaining Remaining.
+ * @param exchFut Exchange future.
+ * @param missed Missed.
+ * @param d initial DemandMessage.
+ */
+ private void handleSupplyMessage(
+ SupplyMessage s,
+ ClusterNode node,
+ AffinityTopologyVersion topVer,
+ GridDhtPartitionTopology top,
+ ConcurrentHashMap8<Integer, Boolean> remaining,
+ GridDhtPartitionsExchangeFuture exchFut,
+ Set<Integer> missed,
+ GridDhtPartitionDemandMessage d) {
+
+ //Todo: check and remove
+ // Check that message was received from expected node.
+ if (!s.senderId().equals(node.id())) {
+ U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+ ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
- if (cctx.affinity().localNode(p, topVer)) {
- GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+ return;
+ }
- assert part != null;
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + s);
- if (part.state() == MOVING) {
- boolean reserved = part.reserve();
+ GridDhtPartitionSupplyMessage supply = s.supply();
- assert reserved : "Failed to reserve partition [gridName=" +
- cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
- part.lock();
+ return;
+ }
- try {
- Collection<Integer> invalidParts = new GridLeanSet<>();
+ assert supply.infos().entrySet().size() == 1;//Todo: remove after supply message refactoring
- // Loop through all received entries and try to preload them.
- for (GridCacheEntryInfo entry : e.getValue().infos()) {
- if (!invalidParts.contains(p)) {
- if (!part.preloadingPermitted(entry.key(), entry.version())) {
- if (log.isDebugEnabled())
- log.debug("Preloading is not permitted for entry due to " +
- "evictions [key=" + entry.key() +
- ", ver=" + entry.version() + ']');
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {//todo:only one partition (supply refactoring)
+ int p = e.getKey();
- continue;
- }
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
- if (!preloadEntry(node, p, entry, topVer)) {
- invalidParts.add(p);
+ assert part != null;
- if (log.isDebugEnabled())
- log.debug("Got entries for invalid partition during " +
- "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- }
- }
- }
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
- boolean last = supply.last().contains(p);
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
- // If message was last for this partition,
- // then we take ownership.
- if (last) {
- remaining.remove(p);
+ part.lock();
- top.own(part);
+ try {
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
- if (log.isDebugEnabled())
- log.debug("Finished rebalancing partition: " + part);
+ continue;
+ }
+ try {
+ if (!preloadEntry(node, p, entry, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
- }
- }
- finally {
- part.unlock();
- part.release();
+ break;
}
}
- else {
- remaining.remove(p);
+ catch (IgniteCheckedException ex) {
+ cancel();
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ return;
}
}
- else {
- remaining.remove(p);
- if (log.isDebugEnabled())
- log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
- }
- }
+ boolean last = supply.last().contains(p);//Todo: refactor as boolean "last"
- remaining.removeAll(s.supply().missed());
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ top.own(part);
- // Only request partitions based on latest topology version.
- for (Integer miss : s.supply().missed())
- if (cctx.affinity().localNode(miss, topVer))
- missed.add(miss);
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
- if (remaining.isEmpty())
- break; // While.
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
- if (s.supply().ack()) {
- retry = true;
+ remaining.remove(p);
- break;
+ demandNextPartition(node, remaining, d);
+ }
}
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ remaining.remove(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
- while (retry && !isCancelled() && !topologyChanged());
+ else {
+ remaining.remove(p);
- return missed;
- }
- finally {
- cctx.io().removeOrderedHandler(d.topic());
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
}
+
+ for (Integer miss : s.supply().missed()) // Todo: miss as param, not collection
+ remaining.remove(miss);
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : s.supply().missed())
+ if (cctx.affinity().localNode(miss, topVer))
+ missed.add(miss);
}
/**
- * @throws InterruptedException If interrupted.
+ * @param node Node.
+ * @param remaining Remaining.
+ * @param d initial DemandMessage.
*/
- private void drainQueue() throws InterruptedException {
- while (!msgQ.isEmpty()) {
- SupplyMessage msg = msgQ.take();
+ private void demandNextPartition(
+ final ClusterNode node,
+ final ConcurrentHashMap8<Integer, Boolean> remaining,
+ final GridDhtPartitionDemandMessage d
+ ) {
+ try {
+ for (Integer p : remaining.keySet()) {
+ if (remaining.replace(p, false, true)) {
+ Collection<Integer> nextPs = new ArrayList<>(1);
- if (log.isDebugEnabled())
- log.debug("Drained supply message: " + msg);
+ nextPs.add(p);
+
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD = new GridDhtPartitionDemandMessage(d, nextPs);
+
+ nextD.topic(topic(p, node.id()));
+
+ // Send demand message.
+ cctx.io().send(node, nextD, cctx.ioPolicy());
+
+ break;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+ cancel();
}
}
@@ -980,7 +950,7 @@ public class GridDhtPartitionDemandPool {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
+ return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 13cfef3..42d6bb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -452,11 +452,11 @@ class GridDhtPartitionSupplyPool {
// Mark as last supply message.
s.last(part);
- if (ack) {
- s.markAck();
-
- break; // Partition for loop.
- }
+// if (ack) {
+// s.markAck();
+//
+// break; // Partition for loop.
+// }
}
finally {
loc.release();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a43ebe2..fbcbc37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -244,11 +244,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void onExchangeFutureAdded() {
- demandPool.onExchangeFutureAdded();
- }
-
- /** {@inheritDoc} */
@Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
demandPool.updateLastExchangeFuture(lastFut);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e3d8008/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
new file mode 100644
index 0000000..e90b7af
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ private static int TEST_SIZE = 1_024_000;
+
+ /** cache name. */
+ protected static String CACHE_NAME_DHT = "cache";
+
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ cacheCfg.setName(CACHE_NAME_DHT);
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg.setRebalanceBatchSize(1024);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ //cacheCfg.setRebalanceTimeout(1000000);
+ cacheCfg.setBackups(1);
+
+ iCfg.setCacheConfiguration(cacheCfg);
+ return iCfg;
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testMassiveRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i);
+ }
+ }
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ // startGrid(1);
+
+ startGrid(2);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ stopGrid(0);
+
+ // Thread.sleep(10000);
+
+ // stopGrid(1);
+
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert grid(2).cachex(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+ }
+
+ log.info("Spend " + spend + " seconds to preload entries.");
+
+ stopAllGrids();
+ }
+}
\ No newline at end of file