You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/14 18:38:28 UTC
[34/34] incubator-ignite git commit: ignite-1093
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64319443
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64319443
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64319443
Branch: refs/heads/ignite-1093
Commit: 64319443ab55aa4a0fc4c56182c774dec8446d48
Parents: 50d32b3
Author: Anton Vinogradov <vi...@gmail.com>
Authored: Fri Aug 14 16:29:31 2015 +0300
Committer: Anton Vinogradov <vi...@gmail.com>
Committed: Fri Aug 14 16:29:31 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 27 ++
.../communication/GridIoMessageFactory.java | 7 +-
.../processors/cache/GridCacheIoManager.java | 8 +
.../dht/preloader/GridDhtPartitionDemander.java | 156 ++++---
.../dht/preloader/GridDhtPartitionSupplier.java | 25 +-
.../GridDhtPartitionSupplyMessageV2.java | 423 +++++++++++++++++++
.../GridCacheMassiveRebalancingSelfTest.java | 210 ---------
...ridCacheMassiveRebalancingAsyncSelfTest.java | 37 ++
...GridCacheMassiveRebalancingSyncSelfTest.java | 252 +++++++++++
9 files changed, 864 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3ad0f01..a19e136 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -57,6 +57,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default rebalance timeout (ms).*/
public static final long DFLT_REBALANCE_TIMEOUT = 10000;
+ /** Default rebalance batches count. */
+ public static final long DFLT_REBALANCE_BATCHES_COUNT = 3;
+
/** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */
public static final long DFLT_REBALANCE_THROTTLE = 0;
@@ -240,6 +243,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Off-heap memory size. */
private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
+ /** Rebalance batches count. */
+ private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
/** */
private boolean swapEnabled = DFLT_SWAP_ENABLED;
@@ -1751,6 +1757,27 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide
+ * one new to each next demand request.
+ *
+ * Gets number of batches generated by supply node at rebalancing start.
+ *
+ * @return
+ */
+ public long getRebalanceBatchesCount() {
+ return rebalanceBatchesCount;
+ }
+
+ /**
+ * Sets number of batches generated by supply node at rebalancing start.
+ *
+ * @param rebalanceBatchesCnt batches count.
+ */
+ public void setRebalanceBatchesCount(long rebalanceBatchesCnt) {
+ this.rebalanceBatchesCount = rebalanceBatchesCnt;
+ }
+
+ /**
* Gets cache store session listener factories.
*
* @return Cache store session listener factories.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7fe8da8..7ddbfb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -600,7 +600,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..112] - this
+ case 113:
+ msg = new GridDhtPartitionSupplyMessageV2();
+
+ break;
+
+ // [-3..113] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..da55f7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -503,6 +503,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case 113: {
+ GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg;
+
+ U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 16f7a61..262ccb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -72,9 +72,6 @@ public class GridDhtPartitionDemander {
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
- /** Assignments. */
- private volatile GridDhtPreloaderAssignments assigns;
-
/**
* @param cctx Cache context.
* @param busyLock Shutdown lock.
@@ -95,8 +92,8 @@ public class GridDhtPartitionDemander {
for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
final int idx = cnt;
- cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) {
+ cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
+ @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
enterBusy();
try {
@@ -110,7 +107,7 @@ public class GridDhtPartitionDemander {
}
}
- syncFut = new SyncFuture();
+ syncFut = new SyncFuture(null);
if (!enabled)
// Calling onDone() immediately since preloading is disabled.
@@ -282,13 +279,15 @@ public class GridDhtPartitionDemander {
if (delay == 0 || force) {
assert assigns != null;
- AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion topVer = assigns.topologyVersion();
- if (this.assigns != null) {
+ if (syncFut.isInited()) {
syncFut.get();
- syncFut = new SyncFuture();
+ syncFut = new SyncFuture(assigns);
}
+ else
+ syncFut.init(assigns);
if (assigns.isEmpty() || topologyChanged(topVer)) {
syncFut.onDone();
@@ -296,28 +295,30 @@ public class GridDhtPartitionDemander {
return;
}
- this.assigns = assigns;
-
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
GridDhtPartitionDemandMessage d = e.getValue();
d.timeout(cctx.config().getRebalanceTimeout());
d.workerId(0);//old api support.
- ClusterNode node = e.getKey();
+ final ClusterNode node = e.getKey();
final long start = U.currentTimeMillis();
final CacheConfiguration cfg = cctx.config();
+ final AffinityTopologyVersion top = d.topologyVersion();
+
if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]");
-
- syncFut.listen(new CI1<Object>() {
- @Override public void apply(Object t) {
- U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode="
- + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+ ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+
+ syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> t) {
+ Boolean cancelled = ((SyncFuture)t).cancelled();
+ U.log(log, (cancelled ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
+ + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
+ ", time=" + (U.currentTimeMillis() - start) + " ms]");
}
});
}
@@ -394,7 +395,7 @@ public class GridDhtPartitionDemander {
* @param c Partitions.
* @return String representation of partitions list.
*/
- private String partitionsList(Collection<Integer> c){
+ private String partitionsList(Collection<Integer> c) {
LinkedList<Integer> s = new LinkedList<>(c);
Collections.sort(s);
@@ -446,21 +447,19 @@ public class GridDhtPartitionDemander {
private void handleSupplyMessage(
int idx,
final UUID id,
- final GridDhtPartitionSupplyMessage supply) {
- ClusterNode node = cctx.node(id);
-
- assert node != null;
-
- GridDhtPartitionDemandMessage d = assigns.get(node);
-
- AffinityTopologyVersion topVer = d.topologyVersion();
+ final GridDhtPartitionSupplyMessageV2 supply) {
+ AffinityTopologyVersion topVer = supply.topologyVersion();
if (topologyChanged(topVer)) {
- syncFut.cancel(id);
+ syncFut.onCancel(id, topVer);
return;
}
+ ClusterNode node = cctx.node(id);
+
+ assert node != null;
+
if (log.isDebugEnabled())
log.debug("Received supply message: " + supply);
@@ -469,15 +468,13 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Class got undeployed during preloading: " + supply.classError());
- syncFut.cancel(id);
+ syncFut.onCancel(id, topVer);
return;
}
final GridDhtPartitionTopology top = cctx.dht().topology();
- GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture();
-
try {
// Preload.
@@ -524,14 +521,10 @@ public class GridDhtPartitionDemander {
if (last) {
top.own(part);
- syncFut.onPartitionDone(id, p);
+ syncFut.onPartitionDone(id, p, topVer);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
-
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
}
}
finally {
@@ -540,14 +533,14 @@ public class GridDhtPartitionDemander {
}
}
else {
- syncFut.onPartitionDone(id, p);
+ syncFut.onPartitionDone(id, p, topVer);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
- syncFut.onPartitionDone(id, p);
+ syncFut.onPartitionDone(id, p, topVer);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -557,35 +550,40 @@ public class GridDhtPartitionDemander {
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed())
if (cctx.affinity().localNode(miss, topVer))
- syncFut.onMissedPartition(id, miss);
+ syncFut.onMissedPartition(id, miss, topVer);
for (Integer miss : supply.missed())
- syncFut.onPartitionDone(id, miss);
+ syncFut.onPartitionDone(id, miss, topVer);
if (!syncFut.isDone()) {
- // Create copy.
- GridDhtPartitionDemandMessage nextD =
- new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
+ GridDhtPartitionDemandMessage d = syncFut.getDemandMessage(topVer, node);
+
+ if (d != null) {
+
+ // Create copy.
+ GridDhtPartitionDemandMessage nextD =
+ new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet());
- nextD.topic(topic(idx, cctx.cacheId()));
+ nextD.topic(topic(idx, cctx.cacheId()));
- // Send demand message.
- cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
- nextD, cctx.ioPolicy(), d.timeout());
+ // Send demand message.
+ cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()),
+ nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ }
}
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
", msg=" + e.getMessage() + ']');
- syncFut.cancel(id);
+ syncFut.onCancel(id, topVer);
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to receive partitions from node (rebalancing will not " +
- "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex);
+ "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
- syncFut.cancel(id);
+ syncFut.onCancel(id, topVer);
}
}
@@ -687,7 +685,7 @@ public class GridDhtPartitionDemander {
/**
*
*/
- private class SyncFuture extends GridFutureAdapter<Object> {
+ public class SyncFuture extends GridFutureAdapter<Boolean> {
/** */
private static final long serialVersionUID = 1L;
@@ -695,32 +693,74 @@ public class GridDhtPartitionDemander {
private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
- public void append(UUID nodeId, Collection<Integer> parts) {
+ /** Assignments. */
+ private volatile GridDhtPreloaderAssignments assigns;
+
+ private volatile boolean cancelled = false;
+
+ SyncFuture(GridDhtPreloaderAssignments assigns) {
+ this.assigns = assigns;
+ }
+
+ public AffinityTopologyVersion topologyVersion() {
+ return assigns != null ? assigns.topologyVersion() : null;
+ }
+
+ void init(
+ GridDhtPreloaderAssignments assigns) {
+ this.assigns = assigns;
+ }
+
+ boolean isInited() {
+ return assigns != null;
+ }
+
+ void append(UUID nodeId, Collection<Integer> parts) {
remaining.put(nodeId, parts);
missed.put(nodeId, new GridConcurrentHashSet<Integer>());
}
- void cancel(UUID nodeId) {
- if (isDone())
+ GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) {
+ if (!topVer.equals(assigns.topologyVersion()))
+ return null;
+
+ return assigns.get(node);
+ }
+
+ void onCancel(UUID nodeId, AffinityTopologyVersion topVer) {
+ if (isDone() || !topVer.equals(assigns.topologyVersion()))
return;
remaining.remove(nodeId);
+ cancelled = true;
+
checkIsDone();
}
- void onMissedPartition(UUID nodeId, int p) {
+ boolean cancelled() {
+ return cancelled;
+ }
+
+ void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
+ if (isDone() || !topVer.equals(assigns.topologyVersion()))
+ return;
+
if (missed.get(nodeId) == null)
missed.put(nodeId, new GridConcurrentHashSet<Integer>());
missed.get(nodeId).add(p);
}
- void onPartitionDone(UUID nodeId, int p) {
- if (isDone())
+ void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) {
+ if (isDone() || !topVer.equals(assigns.topologyVersion()))
return;
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ assigns.exchangeFuture().discoveryEvent());
+
Collection<Integer> parts = remaining.get(nodeId);
parts.remove(p);
@@ -758,7 +798,7 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary?
- onDone();
+ onDone(cancelled);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b948fbd..c496f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -170,8 +170,8 @@ class GridDhtPartitionSupplier {
if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
return;
- GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
- d.updateSequence(), cctx.cacheId());
+ GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
+ d.updateSequence(), cctx.cacheId(), d.topologyVersion());
long preloadThrottle = cctx.config().getRebalanceThrottle();
@@ -180,12 +180,13 @@ class GridDhtPartitionSupplier {
T2<UUID, Object> scId = new T2<>(id, d.topic());
try {
- SupplyContext sctx = scMap.remove(scId);
-
if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
doneMap.remove(scId);
+ scMap.remove(scId);
}
+ SupplyContext sctx = scMap.remove(scId);
+
if (doneMap.get(scId) != null)
return;
@@ -195,7 +196,7 @@ class GridDhtPartitionSupplier {
boolean newReq = true;
- long maxBatchesCnt = 3;//Todo: param
+ long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
if (sctx != null) {
phase = sctx.phase;
@@ -273,8 +274,8 @@ class GridDhtPartitionSupplier {
return;
}
else {
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
}
}
@@ -340,8 +341,8 @@ class GridDhtPartitionSupplier {
return;
}
else {
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
}
}
@@ -443,8 +444,8 @@ class GridDhtPartitionSupplier {
return;
}
else {
- s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
- cctx.cacheId());
+ s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(),
+ cctx.cacheId(), d.topologyVersion());
}
}
@@ -491,7 +492,7 @@ class GridDhtPartitionSupplier {
* @return {@code True} if message was sent, {@code false} if recipient left grid.
* @throws IgniteCheckedException If failed.
*/
- private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s)
throws IgniteCheckedException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..93d0db6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Partition supply message.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Worker ID. */
+ private int workerId = -1;
+
+ /** Update sequence. */
+ private long updateSeq;
+
+ /** Acknowledgement flag. */
+ private boolean ack;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Partitions that have been fully sent. */
+ @GridDirectCollection(int.class)
+ private Collection<Integer> last;
+
+ /** Partitions which were not found. */
+ @GridToStringInclude
+ @GridDirectCollection(int.class)
+ private Collection<Integer> missed;
+
+ /** Entries. */
+ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
+ private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+
+ /** Message size. */
+ @GridDirectTransient
+ private int msgSize;
+
+ /**
+ * @param workerId Worker ID.
+ * @param updateSeq Update sequence for this node.
+ * @param cacheId Cache ID.
+ */
+ GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) {
+ assert workerId >= 0;
+ assert updateSeq > 0;
+
+ this.cacheId = cacheId;
+ this.updateSeq = updateSeq;
+ this.workerId = workerId;
+ this.topVer = topVer;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtPartitionSupplyMessageV2() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean ignoreClassErrors() {
+ return true;
+ }
+
+ /**
+ * @return Worker ID.
+ */
+ int workerId() {
+ return workerId;
+ }
+
+ /**
+ * @return Update sequence.
+ */
+ long updateSequence() {
+ return updateSeq;
+ }
+
+ /**
+ * Marks this message for acknowledgment.
+ */
+ void markAck() {
+ ack = true;
+ }
+
+ /**
+ * @return Acknowledgement flag.
+ */
+ boolean ack() {
+ return ack;
+ }
+
+ /**
+ * @return Topology version for which demand message is sent.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Flag to indicate last message for partition.
+ */
+ Collection<Integer> last() {
+ return last == null ? Collections.<Integer>emptySet() : last;
+ }
+
+ /**
+ * @param p Partition which was fully sent.
+ */
+ void last(int p) {
+ if (last == null)
+ last = new HashSet<>();
+
+ if (last.add(p)) {
+ msgSize += 4;
+
+ // If partition is empty, we need to add it.
+ if (!infos.containsKey(p)) {
+ CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection();
+
+ infoCol.init();
+
+ infos.put(p, infoCol);
+ }
+ }
+ }
+
+ /**
+ * @param p Missed partition.
+ */
+ void missed(int p) {
+ if (missed == null)
+ missed = new HashSet<>();
+
+ if (missed.add(p))
+ msgSize += 4;
+ }
+
+ /**
+ * @return Missed partitions.
+ */
+ Collection<Integer> missed() {
+ return missed == null ? Collections.<Integer>emptySet() : missed;
+ }
+
+ /**
+ * @return Entries.
+ */
+ Map<Integer, CacheEntryInfoCollection> infos() {
+ return infos;
+ }
+
+ /**
+ * @return Message size.
+ */
+ int messageSize() {
+ return msgSize;
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /**
+ * @param p Partition.
+ * @param info Entry to add.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+ assert info != null;
+ assert (info.key() != null || info.keyBytes() != null);
+ assert info.value() != null;
+
+ // Need to call this method to initialize info properly.
+ marshalInfo(info, ctx);
+
+ msgSize += info.marshalledSize(ctx);
+
+ CacheEntryInfoCollection infoCol = infos.get(p);
+
+ if (infoCol == null) {
+ msgSize += 4;
+
+ infos.put(p, infoCol = new CacheEntryInfoCollection());
+
+ infoCol.init();
+ }
+
+ infoCol.add(info);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryInfoCollection col : infos().values()) {
+ List<GridCacheEntryInfo> entries = col.infos();
+
+ for (int i = 0; i < entries.size(); i++)
+ entries.get(i).unmarshal(cacheCtx, ldr);
+ }
+ }
+
+ /**
+ * @return Number of entries in message.
+ */
+ public int size() {
+ return infos.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("ack", ack))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeLong("updateSeq", updateSeq))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("workerId", workerId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ ack = reader.readBoolean("ack");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ last = reader.readCollection("last", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ updateSeq = reader.readLong("updateSeq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ workerId = reader.readInt("workerId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 113;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtPartitionSupplyMessageV2.class, this,
+ "size", size(),
+ "parts", infos.keySet(),
+ "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
deleted file mode 100644
index 0771509..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- private static int TEST_SIZE = 1_024_000;
-
- /** cache name. */
- protected static String CACHE_NAME_DHT = "cache";
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return Long.MAX_VALUE;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
- if (getTestGridName(3).equals(gridName))
- iCfg.setClientMode(true);
-
- cacheCfg.setName(CACHE_NAME_DHT);
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- //cacheCfg.setRebalanceBatchSize(1024);
- cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg.setRebalanceThreadPoolSize(4);
- //cacheCfg.setRebalanceTimeout(1000000);
- cacheCfg.setBackups(1);
-
- iCfg.setCacheConfiguration(cacheCfg);
- return iCfg;
- }
-
- /**
- * @param ignite Ignite.
- */
- private void generateData(Ignite ignite) {
- try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Prepared " + i / 1_000_000 + "m entries.");
-
- stmr.addData(i, i);
- }
- }
- }
-
- /**
- * @param ignite Ignite.
- * @throws IgniteCheckedException
- */
- private void checkData(Ignite ignite) throws IgniteCheckedException {
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Checked " + i / 1_000_000 + "m entries.");
-
- assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
- }
- }
-
- /**
- * @throws Exception
- */
- public void testMassiveRebalancing() throws Exception {
- Ignite ignite = startGrid(0);
-
- generateData(ignite);
-
- log.info("Preloading started.");
-
- long start = System.currentTimeMillis();
-
- startGrid(1);
-
- startGrid(2);
-
- long spend = (System.currentTimeMillis() - start) / 1000;
-
- IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
- IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
- stopGrid(0);
-
- //TODO: refactor to get futures by topology
- while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
- f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
- U.sleep(100);
-
- ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
- ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-
- f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
- stopGrid(1);
-
- while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
- U.sleep(100);
-
- ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
-
- checkData(grid(2));
-
- log.info("Spend " + spend + " seconds to preload entries.");
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception
- */
- public void testOpPerSecRebalancingTest() throws Exception {
- startGrid(0);
-
- final AtomicBoolean cancelled = new AtomicBoolean(false);
-
- generateData(grid(0));
-
- startGrid(1);
- startGrid(2);
- startGrid(3);
-
- Thread t = new Thread(new Runnable() {
- @Override public void run() {
-
- long spend = 0;
-
- long ops = 0;
-
- while (!cancelled.get()) {
- try {
- long start = System.currentTimeMillis();
-
- int size = 1000;
-
- for (int i = 0; i < size; i++)
- grid(3).cachex(CACHE_NAME_DHT).remove(i);
-
- for (int i = 0; i < size; i++)
- grid(3).cachex(CACHE_NAME_DHT).put(i, i);
-
- spend += System.currentTimeMillis() - start;
-
- ops += size * 2;
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace();
- }
-
- log.info("Ops. per ms: " + ops / spend);
- }
- }
- });
- t.start();
-
- stopGrid(0);
- startGrid(0);
-
- stopGrid(0);
- startGrid(0);
-
- stopGrid(0);
- startGrid(0);
-
- cancelled.set(true);
- t.join();
-
- checkData(grid(3));
-
- //stopAllGrids();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..8bcd6d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+ return iCfg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..cd12954
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ private static int TEST_SIZE = 1_024_000;
+
+ /** cache name. */
+ protected static String CACHE_NAME_DHT = "cache";
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(3).equals(gridName))
+ iCfg.setClientMode(true);
+
+ cacheCfg.setName(CACHE_NAME_DHT);
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg.setRebalanceBatchSize(1024);
+ //cacheCfg.setRebalanceBatchesCount(1);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setRebalanceThreadPoolSize(4);
+ //cacheCfg.setRebalanceTimeout(1000000);
+ cacheCfg.setBackups(1);
+
+ iCfg.setCacheConfiguration(cacheCfg);
+ return iCfg;
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite) {
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i);
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException
+ */
+ protected void checkData(Ignite ignite) throws IgniteCheckedException {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match";
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testSimpleRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+
+ IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+ f1.get();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ stopGrid(0);
+
+ checkData(grid(1));
+
+ log.info("Spend " + spend + " seconds to preload entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testComplexRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+ startGrid(2);
+
+ IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ f2.get();
+
+ IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) {
+ U.sleep(100);
+
+ f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ }
+ f1.get();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+ f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+ stopGrid(0);
+
+ //TODO: refactor to get futures by topology
+ while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() ||
+ f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ U.sleep(100);
+
+ ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+ ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+ f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+ stopGrid(1);
+
+ while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture())
+ U.sleep(100);
+
+ ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get();
+
+ checkData(grid(2));
+
+ log.info("Spend " + spend + " seconds to preload entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void _testOpPerSecRebalancingTest() throws Exception {
+ startGrid(0);
+
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ generateData(grid(0));
+
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+
+ Thread t = new Thread(new Runnable() {
+ @Override public void run() {
+
+ long spend = 0;
+
+ long ops = 0;
+
+ while (!cancelled.get()) {
+ try {
+ long start = System.currentTimeMillis();
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++)
+ grid(3).cachex(CACHE_NAME_DHT).remove(i);
+
+ for (int i = 0; i < size; i++)
+ grid(3).cachex(CACHE_NAME_DHT).put(i, i);
+
+ spend += System.currentTimeMillis() - start;
+
+ ops += size * 2;
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace();
+ }
+
+ log.info("Ops. per ms: " + ops / spend);
+ }
+ }
+ });
+ t.start();
+
+ stopGrid(0);
+ startGrid(0);
+
+ stopGrid(0);
+ startGrid(0);
+
+ stopGrid(0);
+ startGrid(0);
+
+ cancelled.set(true);
+ t.join();
+
+ checkData(grid(3));
+
+ //stopAllGrids();
+ }
+}
\ No newline at end of file