You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 11:18:37 UTC
[32/33] ignite git commit: 1093
1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/feea8f98
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/feea8f98
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/feea8f98
Branch: refs/heads/ignite-1093-3
Commit: feea8f983e5eab7b742e2bd1f2ec5c1b8d1ec6d4
Parents: d78e4cd
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Oct 28 12:55:43 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Oct 28 12:55:43 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCachePreloader.java | 7 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../GridDhtPartitionDemandMessage.java | 2 -
.../dht/preloader/GridDhtPartitionDemander.java | 44 +++++-
.../dht/preloader/GridDhtPartitionSupplier.java | 156 +++++++++----------
.../GridDhtPartitionSupplyMessageV2.java | 2 -
.../GridDhtPartitionsExchangeFuture.java | 2 +
.../dht/preloader/GridDhtPreloader.java | 5 +
.../GridCacheRebalancingSyncSelfTest.java | 12 +-
9 files changed, 139 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 79861a2..b2bb8f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -171,4 +171,11 @@ public interface GridCachePreloader {
* @param part Partition.
*/
public void evictPartitionAsync(GridDhtLocalPartition part);
+
+ /**
+ * Handles new topology.
+ *
+ * @param topVer Topology version.
+ */
+ public void onTopologyChanged(AffinityTopologyVersion topVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b784383..d465950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -173,4 +173,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
@Override public void evictPartitionAsync(GridDhtLocalPartition part) {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 4ac644a..e99fa9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -68,8 +68,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
* @param topVer Topology version.
*/
GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
- assert updateSeq > 0;
-
this.cacheId = cacheId;
this.updateSeq = updateSeq;
this.topVer = topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6479542..deedf21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -377,8 +377,12 @@ public class GridDhtPartitionDemander {
initD.updateSequence(fut.updateSeq);
initD.timeout(cctx.config().getRebalanceTimeout());
- cctx.io().sendOrderedMessage(node,
- GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+ synchronized (fut) {
+ if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened.
+ // New requests will not be covered by failovers.
+ cctx.io().sendOrderedMessage(node,
+ GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+ }
if (log.isDebugEnabled())
log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
@@ -810,11 +814,15 @@ public class GridDhtPartitionDemander {
if (isDone())
return true;
- remaining.clear();
-
U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
+ ", topology=" + topologyVersion());
+ for (UUID nodeId : remaining.keySet()) {
+ cleanupRemoteContexts(nodeId);
+ }
+
+ remaining.clear();
+
checkIsDone(true /* cancelled */);
}
@@ -833,6 +841,8 @@ public class GridDhtPartitionDemander {
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+ cleanupRemoteContexts(nodeId);
+
remaining.remove(nodeId);
checkIsDone();
@@ -856,6 +866,32 @@ public class GridDhtPartitionDemander {
}
}
+ private void cleanupRemoteContexts(UUID nodeId) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ //Check remote node rebalancing API version.
+ if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
+
+ GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+ -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+
+ d.timeout(cctx.config().getRebalanceTimeout());
+
+ try {
+ for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+ d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+
+ cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send failover context cleanup request to node");
+ }
+ }
+ }
+
/**
* @param nodeId Node id.
* @param p P.
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f5ae93b..9db2dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -18,17 +18,15 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -40,13 +38,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
-import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
/**
@@ -68,12 +63,8 @@ class GridDhtPartitionSupplier {
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
- /** Supply context map. T2: nodeId, idx. */
- private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap =
- new ConcurrentHashMap8<>();
-
- /** Rebalancing listener. */
- private GridLocalEventListener lsnr;
+ /** Supply context map. T2: nodeId, idx, topVer. */
+ private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
/**
* @param cctx Cache context.
@@ -94,32 +85,6 @@ class GridDhtPartitionSupplier {
*
*/
void start() {
- lsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- if (evt instanceof DiscoveryEvent) {
- for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
- T2<UUID, Integer> t = entry.getKey();
-
- if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
- SupplyContext sctx = entry.getValue();
-
- clearContext(sctx, log);
-
- if (log.isDebugEnabled())
- log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]");
-
- scMap.remove(t, sctx);
- }
- }
- }
- else {
- assert false;
- }
- }
- };
-
- cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
startOldListeners();
}
@@ -127,11 +92,16 @@ class GridDhtPartitionSupplier {
*
*/
void stop() {
- if (lsnr != null)
- cctx.events().removeListener(lsnr);
+ synchronized (scMap) {
+ Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
- for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
- clearContext(entry.getValue(), log);
+ clearContext(scMap.get(t), log);
+
+ it.remove();
+ }
}
stopOldListeners();
@@ -152,10 +122,7 @@ class GridDhtPartitionSupplier {
if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
try {
- synchronized (it) {
- if (!((GridCloseableIterator)it).isClosed())
- ((GridCloseableIterator)it).close();
- }
+ ((GridCloseableIterator)it).close();
}
catch (IgniteCheckedException e) {
log.error("Iterator close failed.", e);
@@ -164,12 +131,34 @@ class GridDhtPartitionSupplier {
final GridDhtLocalPartition loc = sc.loc;
- if (loc != null && loc.reservations() > 0) {
- synchronized (loc) {
- if (loc.reservations() > 0)
- loc.release();
- }
+ if (loc != null) {
+ assert loc.reservations() > 0;
+ loc.release();
+ }
+ }
+ }
+
+ /**
+ * Handles new topology.
+ *
+ * @param topVer Topology version.
+ */
+ public void onTopologyChanged(AffinityTopologyVersion topVer) {
+ synchronized (scMap) {
+ Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+ while (it.hasNext()) {
+ T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+ if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts.
+ clearContext(scMap.get(t), log);
+
+ it.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Supply context removed [node=" + t.get1() + "]");
+ }
}
}
}
@@ -195,6 +184,16 @@ class GridDhtPartitionSupplier {
AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
AffinityTopologyVersion demTop = d.topologyVersion();
+ T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
+
+ if (d.updateSequence() == -1) {//Demand node requested context cleanup.
+ synchronized (scMap) {
+ clearContext(scMap.remove(scId), log);
+
+ return;
+ }
+ }
+
if (cutTop.compareTo(demTop) > 0) {
if (log.isDebugEnabled())
log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
@@ -212,16 +211,13 @@ class GridDhtPartitionSupplier {
ClusterNode node = cctx.discovery().node(id);
- T2<UUID, Integer> scId = new T2<>(id, idx);
-
try {
- SupplyContext sctx = scMap.remove(scId);
+ SupplyContext sctx;
- // Context will be cleaned in case topology changed.
- if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) {
- clearContext(sctx, log);
+ synchronized (scMap) {
+ sctx = scMap.remove(scId);
- sctx = null;
+ assert sctx == null || d.updateSequence() == sctx.updateSeq;
}
// Initial demand request should contain partitions list.
@@ -371,7 +367,6 @@ class GridDhtPartitionSupplier {
swapLsnr,
part,
loc,
- d.topologyVersion(),
d.updateSequence());
}
}
@@ -497,7 +492,6 @@ class GridDhtPartitionSupplier {
null,
part,
loc,
- d.topologyVersion(),
d.updateSequence());
}
}
@@ -579,8 +573,6 @@ class GridDhtPartitionSupplier {
}
}
- scMap.remove(scId);
-
reply(node, d, s, scId);
if (log.isDebugEnabled())
@@ -604,7 +596,7 @@ class GridDhtPartitionSupplier {
private boolean reply(ClusterNode n,
GridDhtPartitionDemandMessage d,
GridDhtPartitionSupplyMessageV2 s,
- T2<UUID, Integer> scId)
+ T3<UUID, Integer, AffinityTopologyVersion> scId)
throws IgniteCheckedException {
try {
@@ -623,7 +615,9 @@ class GridDhtPartitionSupplier {
if (log.isDebugEnabled())
log.debug("Failed to send partition supply message because node left grid: " + n.id());
- clearContext(scMap.remove(scId), log);
+ synchronized (scMap) {
+ clearContext(scMap.remove(scId), log);
+ }
return false;
}
@@ -638,7 +632,7 @@ class GridDhtPartitionSupplier {
* @param swapLsnr Swap listener.
*/
private void saveSupplyContext(
- T2<UUID, Integer> t,
+ T3<UUID, Integer, AffinityTopologyVersion> t,
int phase,
Iterator<Integer> partIt,
int part,
@@ -646,17 +640,20 @@ class GridDhtPartitionSupplier {
GridDhtLocalPartition loc,
AffinityTopologyVersion topVer,
long updateSeq) {
- SupplyContext old = scMap.putIfAbsent(t,
- new SupplyContext(phase,
- partIt,
- entryIt,
- swapLsnr,
- part,
- loc,
- topVer,
- updateSeq));
-
- assert old == null;
+ synchronized (scMap) {
+ if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ assert scMap.get(t) == null;
+
+ scMap.put(t,
+ new SupplyContext(phase,
+ partIt,
+ entryIt,
+ swapLsnr,
+ part,
+ loc,
+ updateSeq));
+ }
+ }
}
/**
@@ -681,9 +678,6 @@ class GridDhtPartitionSupplier {
/** Local partition. */
private final GridDhtLocalPartition loc;
- /** Topology version. */
- private final AffinityTopologyVersion topVer;
-
/** Update seq. */
private final long updateSeq;
@@ -700,7 +694,6 @@ class GridDhtPartitionSupplier {
GridCacheEntryInfoCollectSwapListener swapLsnr,
int part,
GridDhtLocalPartition loc,
- AffinityTopologyVersion topVer,
long updateSeq) {
this.phase = phase;
this.partIt = partIt;
@@ -708,7 +701,6 @@ class GridDhtPartitionSupplier {
this.swapLsnr = swapLsnr;
this.part = part;
this.loc = loc;
- this.topVer = topVer;
this.updateSeq = updateSeq;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index d68e417..502620c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -81,8 +81,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
* @param addDepInfo Deployment info flag.
*/
GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
- assert updateSeq > 0;
-
this.cacheId = cacheId;
this.updateSeq = updateSeq;
this.topVer = topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 77e47a7..5c7190b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// Must initialize topology after we get discovery event.
initTopology(cacheCtx);
+ cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+
cacheCtx.preloader().updateLastExchangeFuture(this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 64d5a19..36e0c9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -272,6 +272,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
+ @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+ supplier.onTopologyChanged(topVer);
+ }
+
+ /** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
GridDhtPartitionTopology top = cctx.dht().topology();
http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index cea7808..b17588f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -316,7 +316,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
Map map = U.field(supplier, "scMap");
- assert map.isEmpty();
+ synchronized (map) {
+ assert map.isEmpty();
+ }
}
}
}
@@ -357,8 +359,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
waitForRebalancing(3, 5, 0);
waitForRebalancing(4, 5, 0);
- checkSupplyContextMapIsEmpty();
-
//New cache should start rebalancing.
CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
@@ -431,14 +431,14 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
t4.start();
- stopGrid(0);
+ stopGrid(1);
- waitForRebalancing(1, 6);
+ waitForRebalancing(0, 6);
waitForRebalancing(2, 6);
waitForRebalancing(3, 6);
waitForRebalancing(4, 6);
- stopGrid(1);
+ stopGrid(0);
waitForRebalancing(2, 7);
waitForRebalancing(3, 7);