You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/28 14:49:59 UTC
[41/41] ignite git commit: ignite-1093
ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d89f1b0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d89f1b0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d89f1b0a
Branch: refs/heads/ignite-1093
Commit: d89f1b0af5ed23bc0f6e68fa9f0d377e6be41fcc
Parents: a7ddb62
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Aug 28 15:37:14 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Aug 28 15:37:14 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 1 +
.../ignite/internal/IgniteNodeAttributes.java | 3 +
.../GridCachePartitionExchangeManager.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 709 ++++++++++++++++---
.../dht/preloader/GridDhtPartitionSupplier.java | 290 +++++++-
.../dht/preloader/GridDhtPreloader.java | 10 +-
...ridCacheMassiveRebalancingAsyncSelfTest.java | 91 ---
...GridCacheMassiveRebalancingSyncSelfTest.java | 392 ----------
.../GridCacheRebalancingAsyncSelfTest.java | 85 +++
.../GridCacheRebalancingSyncSelfTest.java | 269 +++++++
10 files changed, 1267 insertions(+), 586 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1db73bf..03110c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1170,6 +1170,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
add(ATTR_USER_NAME, System.getProperty("user.name"));
add(ATTR_GRID_NAME, gridName);
+ add(REBALANCING_VERSION, 1);
add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled());
add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 10b8df0..c04c69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -135,6 +135,9 @@ public final class IgniteNodeAttributes {
/** Node consistent id. */
public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id";
+ /** Rebalancing version id. */
+ public static final String REBALANCING_VERSION = ATTR_PREFIX + ".rebalancing.version";
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 003e8db..bf77d1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -282,7 +282,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() {
@Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) {
- enterBusy();
+ if (!enterBusy())
+ return;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0474bf9..0aa30b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -36,12 +36,17 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
import org.jsr166.*;
import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
import static org.apache.ignite.internal.processors.dr.GridDrType.*;
@@ -69,13 +74,17 @@ public class GridDhtPartitionDemander {
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
+ /** Demand lock. */
+ private final ReadWriteLock demandLock;
+
/**
* @param cctx Cache context.
*/
- public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
+ public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
assert cctx != null;
this.cctx = cctx;
+ this.demandLock = demandLock;
log = cctx.logger(getClass());
@@ -199,7 +208,13 @@ public class GridDhtPartitionDemander {
else
fut.init(assigns);
- if (assigns.isEmpty() || topologyChanged(topVer)) {
+ if (assigns.isEmpty()) {
+ fut.onDone();
+
+ return;
+ }
+
+ if (topologyChanged(topVer)) {
fut.onCancel();
return;
@@ -225,13 +240,17 @@ public class GridDhtPartitionDemander {
}
}
catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
"[cacheName=" + cctx.name() + ']');
+ cSF.onCancel();
- return;
+ return;
+ }
}
catch (IgniteCheckedException e) {
+ cSF.onCancel();
+
throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
}
}
@@ -257,18 +276,22 @@ public class GridDhtPartitionDemander {
}
}
catch (IgniteInterruptedCheckedException ignored) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Failed to wait for ordered rebalance future (grid is stopping): " +
"[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']');
+ cSF.onCancel();
- return;
+ return;
+ }
}
catch (IgniteCheckedException e) {
+ cSF.onCancel();
+
throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e);
}
}
- requestPartitions(assigns, topVer, cSF);
+ requestPartitions(cSF);
}
}).start();
@@ -300,12 +323,13 @@ public class GridDhtPartitionDemander {
}
/**
- * @param assigns Assigns.
+ * @param fut Future.
*/
- private void requestPartitions(
- final GridDhtPreloaderAssignments assigns,
- AffinityTopologyVersion topVer,
- SyncFuture fut) {
+ private void requestPartitions(SyncFuture fut) {
+ final GridDhtPreloaderAssignments assigns = fut.assigns;
+
+ AffinityTopologyVersion topVer = fut.topologyVersion();
+
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
if (topologyChanged(topVer)) {
fut.onCancel();
@@ -313,73 +337,77 @@ public class GridDhtPartitionDemander {
return;
}
+ final ClusterNode node = e.getKey();
+
GridDhtPartitionDemandMessage d = e.getValue();
d.timeout(cctx.config().getRebalanceTimeout());
d.workerId(0);//old api support.
- final ClusterNode node = e.getKey();
+ final CacheConfiguration cfg = cctx.config();
final long start = U.currentTimeMillis();
- final CacheConfiguration cfg = cctx.config();
+ fut.logStart(node.id(), start);
- final AffinityTopologyVersion top = d.topologyVersion();
+ U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
+ ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
- if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
- U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() +
- ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]");
+ //Check remote node rebalancing API version.
+ if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) {
+ GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
- fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> t) {
- Boolean completed = ((SyncFuture)t).isCompleted();
- U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode="
- + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top +
- ", time=" + (U.currentTimeMillis() - start) + " ms]");
- }
- });
- }
+ remainings.addAll(d.partitions());
- GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>();
+ fut.append(node.id(), remainings);
- remainings.addAll(d.partitions());
+ int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
- fut.append(node.id(), remainings);
+ List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
- int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2);
+ for (int cnt = 0; cnt < lsnrCnt; cnt++)
+ sParts.add(new HashSet<Integer>());
- List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+ Iterator<Integer> it = d.partitions().iterator();
- for (int cnt = 0; cnt < lsnrCnt; cnt++)
- sParts.add(new HashSet<Integer>());
+ int cnt = 0;
- Iterator<Integer> it = d.partitions().iterator();
+ while (it.hasNext())
+ sParts.get(cnt++ % lsnrCnt).add(it.next());
- int cnt = 0;
+ for (cnt = 0; cnt < lsnrCnt; cnt++) {
- while (it.hasNext())
- sParts.get(cnt++ % lsnrCnt).add(it.next());
+ if (!sParts.get(cnt).isEmpty()) {
- for (cnt = 0; cnt < lsnrCnt; cnt++) {
+ // Create copy.
+ GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
- if (!sParts.get(cnt).isEmpty()) {
+ initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
- // Create copy.
- GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+ try {
+ if (!topologyChanged(topVer))
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
+ else
+ fut.onCancel();
+ }
+ catch (IgniteCheckedException ex) {
+ fut.onCancel();
- initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt));
+ U.error(log, "Failed to send partition demand message to node", ex);
+ }
- try {
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout());
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to send partition demand message to local node", ex);
+ if (log.isDebugEnabled())
+ log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
}
-
- if (log.isDebugEnabled())
- log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]");
}
}
+ else {
+ DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut);
+
+ fut.append(node.id(), d.partitions());
+
+ dw.run(node, d);
+ }
}
}
@@ -445,7 +473,7 @@ public class GridDhtPartitionDemander {
final SyncFuture fut = syncFut;
if (topologyChanged(topVer)) {
- fut.onCancel(id, topVer);
+ fut.onCancel();
return;
}
@@ -462,7 +490,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Class got undeployed during preloading: " + supply.classError());
- fut.onCancel(id, topVer);
+ fut.onCancel(id);
return;
}
@@ -515,7 +543,7 @@ public class GridDhtPartitionDemander {
if (last) {
top.own(part);
- fut.onPartitionDone(id, p, topVer);
+ fut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
@@ -527,14 +555,14 @@ public class GridDhtPartitionDemander {
}
}
else {
- fut.onPartitionDone(id, p, topVer);
+ fut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
- fut.onPartitionDone(id, p, topVer);
+ fut.onPartitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
@@ -544,10 +572,10 @@ public class GridDhtPartitionDemander {
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed())
if (cctx.affinity().localNode(miss, topVer))
- fut.onMissedPartition(id, miss, topVer);
+ fut.onMissedPartition(id, miss);
for (Integer miss : supply.missed())
- fut.onPartitionDone(id, miss, topVer);
+ fut.onPartitionDone(id, miss);
if (!fut.isDone()) {
@@ -569,15 +597,15 @@ public class GridDhtPartitionDemander {
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
- log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+ log.debug("Node left during rebalancing [node=" + node.id() +
", msg=" + e.getMessage() + ']');
- fut.onCancel(id, topVer);
+ fut.onCancel();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to receive partitions from node (rebalancing will not " +
"fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex);
- fut.onCancel(id, topVer);
+ fut.onCancel(node.id());
}
}
@@ -687,6 +715,10 @@ public class GridDhtPartitionDemander {
private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>();
+ private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
+
+ private Lock lock = new ReentrantLock();
+
private volatile GridLocalEventListener lsnr;
/** Assignments. */
@@ -694,14 +726,23 @@ public class GridDhtPartitionDemander {
private volatile boolean completed = true;
+ /**
+ * @param assigns Assigns.
+ */
SyncFuture(GridDhtPreloaderAssignments assigns) {
this.assigns = assigns;
}
+ /**
+ *
+ */
public AffinityTopologyVersion topologyVersion() {
return assigns != null ? assigns.topologyVersion() : null;
}
+ /**
+ * @param assigns Assigns.
+ */
void init(GridDhtPreloaderAssignments assigns) {
final SyncFuture fut = this;
@@ -716,80 +757,157 @@ public class GridDhtPartitionDemander {
this.assigns = assigns;
}
+ /**
+ *
+ */
boolean isInited() {
return assigns != null;
}
+ /**
+ * @param nodeId Node id.
+ * @param parts Parts.
+ */
void append(UUID nodeId, Collection<Integer> parts) {
remaining.put(nodeId, parts);
missed.put(nodeId, new GridConcurrentHashSet<Integer>());
}
+ /**
+ * @param nodeId Node id.
+ * @param time Time.
+ */
+ void logStart(UUID nodeId, long time) {
+ started.put(nodeId, time);
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param node Node.
+ */
GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) {
- if (!topVer.equals(assigns.topologyVersion()))
+ if (isDone() || !topVer.equals(assigns.topologyVersion()))
return null;
return assigns.get(node);
}
+ /**
+ *
+ */
void onCancel() {
- remaining.clear();
+ lock.lock();
+ try {
+ if (isDone())
+ return;
+
+ remaining.clear();
- completed = false;
+ completed = false;
- checkIsDone();
+ U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name()
+ + ", topology=" + topologyVersion() +
+ ", time=" +
+ (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]");
+
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
}
- void onCancel(UUID nodeId, AffinityTopologyVersion topVer) {
- if (isDone() || !topVer.equals(assigns.topologyVersion()))
- return;
+ /**
+ * @param nodeId Node id.
+ */
+ void onCancel(UUID nodeId) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
- remaining.remove(nodeId);
+ remaining.remove(nodeId);
- completed = false;
+ completed = false;
+
+ U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
- checkIsDone();
}
+ /**
+ * @return Is completed.
+ */
boolean isCompleted() {
return completed;
}
- void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) {
- if (isDone() || !topVer.equals(assigns.topologyVersion()))
- return;
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ void onMissedPartition(UUID nodeId, int p) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
- if (missed.get(nodeId) == null)
- missed.put(nodeId, new GridConcurrentHashSet<Integer>());
+ if (missed.get(nodeId) == null)
+ missed.put(nodeId, new GridConcurrentHashSet<Integer>());
- missed.get(nodeId).add(p);
+ missed.get(nodeId).add(p);
+ }
+ finally {
+ lock.unlock();
+ }
}
- void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) {
- if (isDone() || !topVer.equals(assigns.topologyVersion()))
- return;
+ /**
+ * @param nodeId Node id.
+ * @param p P.
+ */
+ void onPartitionDone(UUID nodeId, int p) {
+ lock.lock();
+ try {
+ if (isDone())
+ return;
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- assigns.exchangeFuture().discoveryEvent());
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ assigns.exchangeFuture().discoveryEvent());
- Collection<Integer> parts = remaining.get(nodeId);
+ Collection<Integer> parts = remaining.get(nodeId);
- if (parts != null) {
- parts.remove(p);
+ if (parts != null) {
+ parts.remove(p);
- if (parts.isEmpty()) {
- remaining.remove(nodeId);
+ if (parts.isEmpty()) {
+ remaining.remove(nodeId);
- if (log.isDebugEnabled())
- log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']');
+ U.log(log, ("Completed rebalancing [cache=" + cctx.name() +
+ ", from node=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]"));
+ }
}
- }
- checkIsDone();
+ checkIsDone();
+ }
+ finally {
+ lock.unlock();
+ }
}
+ /**
+ *
+ */
private void checkIsDone() {
if (remaining.isEmpty()) {
if (log.isDebugEnabled())
@@ -809,8 +927,6 @@ public class GridDhtPartitionDemander {
cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture());
}
- missed.clear();
-
cctx.shared().exchange().scheduleResendPartitions();
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED))
@@ -820,7 +936,412 @@ public class GridDhtPartitionDemander {
cctx.events().removeListener(lsnr);
onDone(completed);
+
+ missed.clear();
+ remaining.clear();
+ started.clear();
+ assigns.clear();
+ }
+ }
+ }
+
+ /**
+ * Supply message wrapper.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private static class SupplyMessage {
+ /** Sender ID. */
+ private UUID sndId;
+
+ /** Supply message. */
+ private GridDhtPartitionSupplyMessage supply;
+
+ /**
+ * Dummy constructor.
+ */
+ private SupplyMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param sndId Sender ID.
+ * @param supply Supply message.
+ */
+ SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
+ this.sndId = sndId;
+ this.supply = supply;
+ }
+
+ /**
+ * @return Sender ID.
+ */
+ UUID senderId() {
+ return sndId;
+ }
+
+ /**
+ * @return Message.
+ */
+ GridDhtPartitionSupplyMessage supply() {
+ return supply;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SupplyMessage.class, this);
+ }
+ }
+
+ /** DemandWorker index. */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private final AtomicInteger dmIdx = new AtomicInteger();
+
+ /**
+ *
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private class DemandWorker {
+ /** Worker ID. */
+ private int id;
+
+ /** Partition-to-node assignments. */
+ private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
+
+ /** Message queue. */
+ private final LinkedBlockingDeque<SupplyMessage> msgQ =
+ new LinkedBlockingDeque<>();
+
+ /** Counter. */
+ private long cntr;
+
+ /** Hide worker logger and use cache logger instead. */
+ private IgniteLogger log = GridDhtPartitionDemander.this.log;
+
+ private volatile SyncFuture fut;
+
+ /**
+ * @param id Worker ID.
+ */
+ private DemandWorker(int id, SyncFuture fut) {
+ assert id >= 0;
+
+ this.id = id;
+ this.fut = fut;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void addMessage(SupplyMessage msg) {
+ msgQ.offer(msg);
+ }
+
+ /**
+ * @param deque Deque to poll from.
+ * @param time Time to wait.
+ * @return Polled item.
+ * @throws InterruptedException If interrupted.
+ */
+ @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
+ return deque.poll(time, MILLISECONDS);
+ }
+
+ /**
+ * @param idx Unique index for this topic.
+ * @return Topic for partition.
+ */
+ public Object topic(long idx) {
+ return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
+ }
+
+ /**
+ * @param node Node to demand from.
+ * @param topVer Topology version.
+ * @param d Demand message.
+ * @param exchFut Exchange future.
+ * @return Missed partitions.
+ * @throws InterruptedException If interrupted.
+ * @throws ClusterTopologyCheckedException If node left.
+ * @throws IgniteCheckedException If failed to send message.
+ */
+ private Set<Integer> demandFromNode(
+ ClusterNode node,
+ final AffinityTopologyVersion topVer,
+ GridDhtPartitionDemandMessage d,
+ GridDhtPartitionsExchangeFuture exchFut
+ ) throws InterruptedException, IgniteCheckedException {
+ GridDhtPartitionTopology top = cctx.dht().topology();
+
+ cntr++;
+
+ d.topic(topic(cntr));
+ d.workerId(id);
+
+ Set<Integer> missed = new HashSet<>();
+
+ // Get the same collection that will be sent in the message.
+ Collection<Integer> remaining = d.partitions();
+
+ if (topologyChanged(topVer))
+ return missed;
+
+ cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ try {
+ boolean retry;
+
+ // DoWhile.
+ // =======
+ do {
+ retry = false;
+
+ // Create copy.
+ d = new GridDhtPartitionDemandMessage(d, remaining);
+
+ long timeout = cctx.config().getRebalanceTimeout();
+
+ d.timeout(timeout);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
+
+ // Send demand message.
+ cctx.io().send(node, d, cctx.ioPolicy());
+
+ // While.
+ // =====
+ while (!topologyChanged(topVer)) {
+ SupplyMessage s = poll(msgQ, timeout);
+
+ // If timed out.
+ if (s == null) {
+ if (msgQ.isEmpty()) { // Safety check.
+ U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
+ " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
+ " configuration properties).");
+
+ // Ordered listener was removed if timeout expired.
+ cctx.io().removeOrderedHandler(d.topic());
+
+ // Must create copy to be able to work with IO manager thread local caches.
+ d = new GridDhtPartitionDemandMessage(d, remaining);
+
+ // Create new topic.
+ d.topic(topic(++cntr));
+
+ // Create new ordered listener.
+ cctx.io().addOrderedHandler(d.topic(),
+ new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId,
+ GridDhtPartitionSupplyMessage msg) {
+ addMessage(new SupplyMessage(nodeId, msg));
+ }
+ });
+
+ // Resend message with larger timeout.
+ retry = true;
+
+ break; // While.
+ }
+ else
+ continue; // While.
+ }
+
+ // Check that message was received from expected node.
+ if (!s.senderId().equals(node.id())) {
+ U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
+ ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
+
+ continue; // While.
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Received supply message: " + s);
+
+ GridDhtPartitionSupplyMessage supply = s.supply();
+
+ // Check whether there were class loading errors on unmarshal
+ if (supply.classError() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Class got undeployed during preloading: " + supply.classError());
+
+ retry = true;
+
+ // Quit preloading.
+ break;
+ }
+
+ // Preload.
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
+ int p = e.getKey();
+
+ if (cctx.affinity().localNode(p, topVer)) {
+ GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+ assert part != null;
+
+ if (part.state() == MOVING) {
+ boolean reserved = part.reserve();
+
+ assert reserved : "Failed to reserve partition [gridName=" +
+ cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+
+ part.lock();
+
+ try {
+ Collection<Integer> invalidParts = new GridLeanSet<>();
+
+ // Loop through all received entries and try to preload them.
+ for (GridCacheEntryInfo entry : e.getValue().infos()) {
+ if (!invalidParts.contains(p)) {
+ if (!part.preloadingPermitted(entry.key(), entry.version())) {
+ if (log.isDebugEnabled())
+ log.debug("Preloading is not permitted for entry due to " +
+ "evictions [key=" + entry.key() +
+ ", ver=" + entry.version() + ']');
+
+ continue;
+ }
+
+ if (!preloadEntry(node, p, entry, topVer)) {
+ invalidParts.add(p);
+
+ if (log.isDebugEnabled())
+ log.debug("Got entries for invalid partition during " +
+ "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
+ }
+ }
+ }
+
+ boolean last = supply.last().contains(p);
+
+ // If message was last for this partition,
+ // then we take ownership.
+ if (last) {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ top.own(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished rebalancing partition: " + part);
+
+ if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+ preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+ exchFut.discoveryEvent());
+ }
+ }
+ finally {
+ part.unlock();
+ part.release();
+ }
+ }
+ else {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
+ }
+ }
+ else {
+ remaining.remove(p);
+ fut.onPartitionDone(node.id(), p);
+
+ if (log.isDebugEnabled())
+ log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
+ }
+ }
+
+ remaining.removeAll(s.supply().missed());
+
+ // Only request partitions based on latest topology version.
+ for (Integer miss : s.supply().missed()) {
+ if (cctx.affinity().localNode(miss, topVer))
+ missed.add(miss);
+
+ fut.onMissedPartition(node.id(), miss);
+ }
+
+ if (remaining.isEmpty())
+ break; // While.
+
+ if (s.supply().ack()) {
+ retry = true;
+
+ break;
+ }
+ }
+ }
+ while (retry && !topologyChanged(topVer));
+
+ return missed;
+ }
+ finally {
+ cctx.io().removeOrderedHandler(d.topic());
+ }
+ }
+
+ /**
+ * @param node Node.
+ * @param d D.
+ */
+ public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+ demandLock.readLock().lock();
+
+ try {
+ GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture();
+
+ AffinityTopologyVersion topVer = fut.assigns.topologyVersion();
+
+ Collection<Integer> missed = new HashSet<>();
+
+ if (topologyChanged(topVer)) {
+ fut.onCancel();
+
+ return;
+ }
+
+ try {
+ Set<Integer> set = demandFromNode(node, topVer, d, exchFut);
+
+ if (!set.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" +
+ set + ']');
+
+ missed.addAll(set);
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during rebalancing (will retry) [node=" + node.id() +
+ ", msg=" + e.getMessage() + ']');
+
+ fut.onCancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to receive partitions from node (rebalancing will not " +
+ "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
+
+ fut.onCancel(node.id());
+ }
+ catch (InterruptedException e) {
+ fut.onCancel();
+ }
}
+ finally {
+ demandLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 347a394..0686376 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -25,15 +25,12 @@ import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jsr166.*;
import java.util.*;
-import java.util.concurrent.locks.*;
-import static org.apache.ignite.internal.GridTopic.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
/**
@@ -73,13 +70,14 @@ class GridDhtPartitionSupplier {
top = cctx.dht().topology();
- depEnabled = cctx.gridDeploy().enabled();
+ depEnabled = cctx.gridDeploy().enabled();
}
/**
*
*/
void start() {
+ startOldListeners();
}
/**
@@ -463,14 +461,14 @@ class GridDhtPartitionSupplier {
int phase,
Iterator<Integer> partIt,
int part,
- Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+ Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) {
scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
}
/**
* Supply context.
*/
- private static class SupplyContext{
+ private static class SupplyContext {
/** Phase. */
private int phase;
@@ -502,4 +500,284 @@ class GridDhtPartitionSupplier {
this.part = part;
}
}
+
+ @Deprecated//Backward compatibility. To be removed in future.
+ public void startOldListeners() {
+ if (!cctx.kernalContext().clientNode()) {
+ int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+ cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+ @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+ processOldDemandMessage(m, id);
+ }
+ });
+ }
+ }
+
+ /**
+ * @param d D.
+ * @param id Id.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) {
+ GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+
+ ClusterNode node = cctx.node(id);
+
+ long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+ boolean ack = false;
+
+ try {
+ for (int part : d.partitions()) {
+ GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+ if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+ // Reply with partition of "-1" to let sender know that
+ // this node is no longer an owner.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Requested partition is not owned by local node [part=" + part +
+ ", demander=" + id + ']');
+
+ continue;
+ }
+
+ GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+ try {
+ if (cctx.isSwapOrOffheapEnabled()) {
+ swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+ cctx.swap().addOffHeapListener(part, swapLsnr);
+ cctx.swap().addSwapListener(part, swapLsnr);
+ }
+
+ boolean partMissing = false;
+
+ for (GridCacheEntryEx e : loc.entries()) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition, so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition [part=" + part +
+ ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ GridCacheEntryInfo info = e.info();
+
+ if (info != null && !info.isNew()) {
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ if (partMissing)
+ continue;
+
+ if (cctx.isSwapOrOffheapEnabled()) {
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ cctx.swap().iterator(part);
+
+ // Iterator may be null if space does not exist.
+ if (iter != null) {
+ try {
+ boolean prepared = false;
+
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ partMissing = true;
+
+ break; // For.
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ // Throttle preloading.
+ if (preloadThrottle > 0)
+ U.sleep(preloadThrottle);
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(), cctx.cacheId());
+ }
+
+ GridCacheSwapEntry swapEntry = e.getValue();
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.keyBytes(e.getKey());
+ info.ttl(swapEntry.ttl());
+ info.expireTime(swapEntry.expireTime());
+ info.version(swapEntry.version());
+ info.value(swapEntry.value());
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry0(part, info, cctx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not send " +
+ "cache entry): " + info);
+
+ continue;
+ }
+
+ // Need to manually prepare cache message.
+ if (depEnabled && !prepared) {
+ ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+ swapEntry.valueClassLoaderId() != null ?
+ cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+ null;
+
+ if (ldr == null)
+ continue;
+
+ if (ldr instanceof GridDeploymentInfo) {
+ s.prepare((GridDeploymentInfo)ldr);
+
+ prepared = true;
+ }
+ }
+ }
+
+ if (partMissing)
+ continue;
+ }
+ finally {
+ iter.close();
+ }
+ }
+ }
+
+ // Stop receiving promote notifications.
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+
+ if (swapLsnr != null) {
+ Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+ swapLsnr = null;
+
+ for (GridCacheEntryInfo info : entries) {
+ if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+ // Demander no longer needs this partition,
+ // so we send '-1' partition and move on.
+ s.missed(part);
+
+ if (log.isDebugEnabled())
+ log.debug("Demanding node does not need requested partition " +
+ "[part=" + part + ", nodeId=" + id + ']');
+
+ // No need to continue iteration over swap entries.
+ break;
+ }
+
+ if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ ack = true;
+
+ if (!replyOld(node, d, s))
+ return;
+
+ s = new GridDhtPartitionSupplyMessage(d.workerId(),
+ d.updateSequence(),
+ cctx.cacheId());
+ }
+
+ if (preloadPred == null || preloadPred.apply(info))
+ s.addEntry(part, info, cctx);
+ else if (log.isDebugEnabled())
+ log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+ info);
+ }
+ }
+
+ // Mark as last supply message.
+ s.last(part);
+
+ if (ack) {
+ s.markAck();
+
+ break; // Partition for loop.
+ }
+ }
+ finally {
+ loc.release();
+
+ if (swapLsnr != null) {
+ cctx.swap().removeOffHeapListener(part, swapLsnr);
+ cctx.swap().removeSwapListener(part, swapLsnr);
+ }
+ }
+ }
+
+ replyOld(node, d, s);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send partition supply message to node: " + node.id(), e);
+ }
+ }
+
+ /**
+ * @param n Node.
+ * @param d Demand message.
+ * @param s Supply message.
+ * @return {@code True} if message was sent, {@code false} if recipient left grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Deprecated//Backward compatibility. To be removed in future.
+ private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+ throws IgniteCheckedException {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+ cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+ return true;
+ }
+ catch (ClusterTopologyCheckedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 585566b..7a9deba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -163,7 +163,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
});
supplier = new GridDhtPartitionSupplier(cctx);
- demander = new GridDhtPartitionDemander(cctx);
+ demander = new GridDhtPartitionDemander(cctx, demandLock);
cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -350,7 +350,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
- demander.handleSupplyMessage(idx, id, s);
+ demandLock.readLock().lock();
+ try {
+ demander.handleSupplyMessage(idx, id, s);
+ }
+ finally {
+ demandLock.readLock().unlock();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
deleted file mode 100644
index ca564ed..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest {
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
- CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
-
- cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
-
- iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
-
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
- if (getTestGridName(20).equals(gridName))
- spi =(FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
-
- return iCfg;
- }
-
- public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi{
- public void fail(){
- simulateNodeFailure();
- }
- }
-
- private volatile FailableTcpDiscoverySpi spi;
-
- /**
- * @throws Exception
- */
- public void testNodeFailedAtRebalancing() throws Exception {
- Ignite ignite = startGrid(0);
-
- generateData(ignite);
-
- log.info("Preloading started.");
-
- startGrid(1);
-
- IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
- f1.get();
-
- startGrid(20);
-
- U.sleep(500);
-
- spi.fail();
-
- U.sleep(500);
-
- f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
- IgniteInternalFuture f0 = ((GridCacheAdapter)grid(0).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
- f1.get();
- f0.get();
-
- stopAllGrids();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
deleted file mode 100644
index f69b710..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- *
- */
-public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest {
- /** */
- protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- private static int TEST_SIZE = 1_000_000;
-
- /** cache name. */
- protected static String CACHE_NAME_DHT = "cache";
-
- /** cache 2 name. */
- protected static String CACHE_2_NAME_DHT = "cache2";
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return Long.MAX_VALUE;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration iCfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
- ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
-
- if (getTestGridName(10).equals(gridName))
- iCfg.setClientMode(true);
-
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
- cacheCfg.setName(CACHE_NAME_DHT);
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- //cacheCfg.setRebalanceBatchSize(1024);
- //cacheCfg.setRebalanceBatchesCount(1);
- cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg.setBackups(1);
-
- CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
-
- cacheCfg2.setName(CACHE_2_NAME_DHT);
- cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
- //cacheCfg2.setRebalanceBatchSize(1024);
- //cacheCfg2.setRebalanceBatchesCount(1);
- cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
- cacheCfg2.setBackups(1);
-
- iCfg.setRebalanceThreadPoolSize(4);
- iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
- return iCfg;
- }
-
- /**
- * @param ignite Ignite.
- */
- protected void generateData(Ignite ignite) {
- try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Prepared " + i / 1_000_000 + "m entries.");
-
- stmr.addData(i, i);
- }
- }
- try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Prepared " + i / 1_000_000 + "m entries.");
-
- stmr.addData(i, i + 3);
- }
- }
- }
-
- /**
- * @param ignite Ignite.
- * @throws IgniteCheckedException
- */
- protected void checkData(Ignite ignite) throws IgniteCheckedException {
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Checked " + i / 1_000_000 + "m entries.");
-
- assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
- "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
- }
- for (int i = 0; i < TEST_SIZE; i++) {
- if (i % 1_000_000 == 0)
- log.info("Checked " + i / 1_000_000 + "m entries.");
-
- assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
- "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
- }
- }
-
- /**
- * @throws Exception
- */
- public void testSimpleRebalancing() throws Exception {
- Ignite ignite = startGrid(0);
-
- generateData(ignite);
-
- log.info("Preloading started.");
-
- long start = System.currentTimeMillis();
-
- startGrid(1);
-
- IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
-
- f1.get();
-
- long spend = (System.currentTimeMillis() - start) / 1000;
-
- stopGrid(0);
-
- checkData(grid(1));
-
- log.info("Spend " + spend + " seconds to rebalance entries.");
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception
- */
- public void testComplexRebalancing() throws Exception {
- Ignite ignite = startGrid(0);
-
- generateData(ignite);
-
- log.info("Preloading started.");
-
- long start = System.currentTimeMillis();
-
- //will be started simultaneously in case of ASYNC mode
- startGrid(1);
- startGrid(2);
- startGrid(3);
- startGrid(4);
-
- //wait until cache rebalanced in async mode
-
- GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
- GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-
- GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
- GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
- GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
- GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
-
- IgniteInternalFuture f24 = p24.syncFuture();
- f24.get();
-
- IgniteInternalFuture f14 = p14.syncFuture();
- f14.get();
-
- AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion();
-
- IgniteInternalFuture f11 = p11.syncFuture();
- IgniteInternalFuture f12 = p12.syncFuture();
- IgniteInternalFuture f13 = p13.syncFuture();
-
- while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) {
- U.sleep(100);
-
- f11 = p11.syncFuture();
- f12 = p12.syncFuture();
- f13 = p13.syncFuture();
- }
- f11.get();
- f12.get();
- f13.get();
-
- IgniteInternalFuture f21 = p21.syncFuture();
- IgniteInternalFuture f22 = p22.syncFuture();
- IgniteInternalFuture f23 = p23.syncFuture();
-
- while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) ||
- !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) {
- U.sleep(100);
-
- f21 = p21.syncFuture();
- f22 = p22.syncFuture();
- f23 = p23.syncFuture();
- }
- f21.get();
- f22.get();
- f23.get();
-
- //cache rebalanced in async node
-
- f11 = p11.syncFuture();
- f12 = p12.syncFuture();
- f13 = p13.syncFuture();
- f14 = p14.syncFuture();
-
- f21 = p21.syncFuture();
- f22 = p22.syncFuture();
- f23 = p23.syncFuture();
- f24 = p24.syncFuture();
-
- stopGrid(0);
-
- //wait until cache rebalanced
-
- while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
- U.sleep(100);
-
- while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
- U.sleep(100);
-
- p11.syncFuture().get();
- p12.syncFuture().get();
- p13.syncFuture().get();
- p14.syncFuture().get();
-
- p21.syncFuture().get();
- p22.syncFuture().get();
- p23.syncFuture().get();
- p24.syncFuture().get();
-
- //cache rebalanced
-
- f12 = p12.syncFuture();
- f13 = p13.syncFuture();
- f14 = p14.syncFuture();
-
- f22 = p22.syncFuture();
- f23 = p23.syncFuture();
- f24 = p24.syncFuture();
-
- stopGrid(1);
-
- //wait until cache rebalanced
-
- while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture())
- U.sleep(100);
-
- while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture())
- U.sleep(100);
-
- p12.syncFuture().get();
- p13.syncFuture().get();
- p14.syncFuture().get();
-
- p22.syncFuture().get();
- p23.syncFuture().get();
- p24.syncFuture().get();
-
- //cache rebalanced
-
- f13 = p13.syncFuture();
- f14 = p14.syncFuture();
-
- f23 = p23.syncFuture();
- f24 = p24.syncFuture();
-
- stopGrid(2);
-
- //wait until cache rebalanced
-
- while (f13 == p13.syncFuture() || f14 == p14.syncFuture())
- U.sleep(100);
-
- while (f23 == p23.syncFuture() || f24 == p24.syncFuture())
- U.sleep(100);
-
- p13.syncFuture().get();
- p14.syncFuture().get();
-
- p23.syncFuture().get();
- p24.syncFuture().get();
-
- //cache rebalanced
-
- stopGrid(3);
-
- long spend = (System.currentTimeMillis() - start) / 1000;
-
- checkData(grid(4));
-
- log.info("Spend " + spend + " seconds to rebalance entries.");
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception
- */
- public void _testOpPerSecRebalancingTest() throws Exception {
- startGrid(0);
-
- final AtomicBoolean cancelled = new AtomicBoolean(false);
-
- generateData(grid(0));
-
- startGrid(1);
- startGrid(2);
- startGrid(10);
-
- Thread t = new Thread(new Runnable() {
- @Override public void run() {
-
- long spend = 0;
-
- long ops = 0;
-
- while (!cancelled.get()) {
- try {
- long start = System.currentTimeMillis();
-
- int size = 1000;
-
- for (int i = 0; i < size; i++)
- grid(10).cachex(CACHE_NAME_DHT).remove(i);
-
- for (int i = 0; i < size; i++)
- grid(10).cachex(CACHE_NAME_DHT).put(i, i);
-
- spend += System.currentTimeMillis() - start;
-
- ops += size * 2;
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace();
- }
-
- log.info("Ops. per ms: " + ops / spend);
- }
- }
- });
- t.start();
-
- stopGrid(0);
- startGrid(0);
-
- stopGrid(0);
- startGrid(0);
-
- stopGrid(0);
- startGrid(0);
-
- cancelled.set(true);
- t.join();
-
- checkData(grid(10));
-
- //stopAllGrids();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..a17fc7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0];
+
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+ cacheCfg = iCfg.getCacheConfiguration()[1];
+
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+ iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(20).equals(gridName))
+ spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
+
+ return iCfg;
+ }
+
+ public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi {
+ public void fail() {
+ simulateNodeFailure();
+ }
+ }
+
+ private volatile FailableTcpDiscoverySpi spi;
+
+ /**
+ * @throws Exception
+ */
+ public void testNodeFailedAtRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ startGrid(1);
+
+ waitForRebalancing(1, 2);
+
+ startGrid(20);
+
+ waitForRebalancing(20, 3);
+
+ spi.fail();
+
+ waitForRebalancing(0, 4);
+ waitForRebalancing(1, 4);
+
+ stopAllGrids();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..0cb6c7b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ private static int TEST_SIZE = 1_000_000;
+
+ /** cache name. */
+ protected static String CACHE_NAME_DHT = "cache";
+
+ /** cache 2 name. */
+ protected static String CACHE_2_NAME_DHT = "cache2";
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return Long.MAX_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+ if (getTestGridName(10).equals(gridName))
+ iCfg.setClientMode(true);
+
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ cacheCfg.setName(CACHE_NAME_DHT);
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg.setRebalanceBatchSize(1024);
+ //cacheCfg.setRebalanceBatchesCount(1);
+ cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg.setBackups(1);
+
+ CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>();
+
+ cacheCfg2.setName(CACHE_2_NAME_DHT);
+ cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+ //cacheCfg2.setRebalanceBatchSize(1024);
+ //cacheCfg2.setRebalanceBatchesCount(1);
+ cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cacheCfg2.setBackups(1);
+
+ iCfg.setRebalanceThreadPoolSize(4);
+ iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
+ return iCfg;
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ protected void generateData(Ignite ignite) {
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i);
+ }
+ }
+ try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+ stmr.addData(i, i + 3);
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @throws IgniteCheckedException
+ */
+ protected void checkData(Ignite ignite) throws IgniteCheckedException {
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+ "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+ }
+ for (int i = 0; i < TEST_SIZE; i++) {
+ if (i % 1_000_000 == 0)
+ log.info("Checked " + i / 1_000_000 + "m entries.");
+
+ assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+ "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testSimpleRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ startGrid(1);
+
+ IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+ f1.get();
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ stopGrid(0);
+
+ checkData(grid(1));
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param id Id.
+ * @param top Topology.
+ */
+ protected void waitForRebalancing(int id, int top) throws IgniteCheckedException {
+ boolean finished = false;
+
+ while (!finished) {
+ finished = true;
+
+ for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+ GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture();
+ if (fut.topologyVersion().topologyVersion() != top) {
+ finished = false;
+
+ break;
+ }
+ else
+ fut.get();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testComplexRebalancing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ generateData(ignite);
+
+ log.info("Preloading started.");
+
+ long start = System.currentTimeMillis();
+
+ //will be started simultaneously in case of ASYNC mode
+ startGrid(1);
+ startGrid(2);
+ startGrid(3);
+ startGrid(4);
+
+ //wait until cache rebalanced in async mode
+ waitForRebalancing(1, 5);
+ waitForRebalancing(2, 5);
+ waitForRebalancing(3, 5);
+ waitForRebalancing(4, 5);
+
+ //cache rebalanced in async node
+
+ stopGrid(0);
+
+ //wait until cache rebalanced
+ waitForRebalancing(1, 6);
+ waitForRebalancing(2, 6);
+ waitForRebalancing(3, 6);
+ waitForRebalancing(4, 6);
+
+ //cache rebalanced
+
+ stopGrid(1);
+
+ //wait until cache rebalanced
+ waitForRebalancing(2, 7);
+ waitForRebalancing(3, 7);
+ waitForRebalancing(4, 7);
+
+ //cache rebalanced
+
+ stopGrid(2);
+
+ //wait until cache rebalanced
+ waitForRebalancing(3, 8);
+ waitForRebalancing(4, 8);
+
+ //cache rebalanced
+
+ stopGrid(3);
+
+ long spend = (System.currentTimeMillis() - start) / 1000;
+
+ checkData(grid(4));
+
+ log.info("Spend " + spend + " seconds to rebalance entries.");
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void testBackwardCompatibility() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ Map<String, Object> map = new HashMap<>(ignite.cluster().localNode().attributes());
+
+ map.put(IgniteNodeAttributes.REBALANCING_VERSION, 0);
+
+ ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map);
+
+ generateData(ignite);
+
+ startGrid(1);
+
+ waitForRebalancing(1, 2);
+
+ stopGrid(0);
+
+ checkData(grid(1));
+
+ }
+}
\ No newline at end of file