You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/28 11:18:37 UTC

[32/33] ignite git commit: 1093

1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/feea8f98
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/feea8f98
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/feea8f98

Branch: refs/heads/ignite-1093-3
Commit: feea8f983e5eab7b742e2bd1f2ec5c1b8d1ec6d4
Parents: d78e4cd
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Oct 28 12:55:43 2015 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Oct 28 12:55:43 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |   7 +
 .../cache/GridCachePreloaderAdapter.java        |   5 +
 .../GridDhtPartitionDemandMessage.java          |   2 -
 .../dht/preloader/GridDhtPartitionDemander.java |  44 +++++-
 .../dht/preloader/GridDhtPartitionSupplier.java | 156 +++++++++----------
 .../GridDhtPartitionSupplyMessageV2.java        |   2 -
 .../GridDhtPartitionsExchangeFuture.java        |   2 +
 .../dht/preloader/GridDhtPreloader.java         |   5 +
 .../GridCacheRebalancingSyncSelfTest.java       |  12 +-
 9 files changed, 139 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 79861a2..b2bb8f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -171,4 +171,11 @@ public interface GridCachePreloader {
      * @param part Partition.
      */
     public void evictPartitionAsync(GridDhtLocalPartition part);
+
+    /**
+     * Handles new topology.
+     *
+     * @param topVer Topology version.
+     */
+    public void onTopologyChanged(AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b784383..d465950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -173,4 +173,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 4ac644a..e99fa9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -68,8 +68,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
      * @param topVer Topology version.
      */
     GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
-        assert updateSeq > 0;
-
         this.cacheId = cacheId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6479542..deedf21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -377,8 +377,12 @@ public class GridDhtPartitionDemander {
                         initD.updateSequence(fut.updateSeq);
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
-                        cctx.io().sendOrderedMessage(node,
-                            GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+                        synchronized (fut) {
+                            if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened.
+                            // New requests will not be covered by failovers.
+                                cctx.io().sendOrderedMessage(node,
+                                    GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout());
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
@@ -810,11 +814,15 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return true;
 
-                remaining.clear();
-
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
                     + ", topology=" + topologyVersion());
 
+                for (UUID nodeId : remaining.keySet()) {
+                    cleanupRemoteContexts(nodeId);
+                }
+
+                remaining.clear();
+
                 checkIsDone(true /* cancelled */);
             }
 
@@ -833,6 +841,8 @@ public class GridDhtPartitionDemander {
                     ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
+                cleanupRemoteContexts(nodeId);
+
                 remaining.remove(nodeId);
 
                 checkIsDone();
@@ -856,6 +866,32 @@ public class GridDhtPartitionDemander {
             }
         }
 
+        private void cleanupRemoteContexts(UUID nodeId) {
+            ClusterNode node = cctx.discovery().node(nodeId);
+
+            //Check remote node rebalancing API version.
+            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
+
+                GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                    -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+
+                d.timeout(cctx.config().getRebalanceTimeout());
+
+                try {
+                    for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+                        d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+
+                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                            d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send failover context cleanup request to node");
+                }
+            }
+        }
+
         /**
          * @param nodeId Node id.
          * @param p P.

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f5ae93b..9db2dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -18,17 +18,15 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -40,13 +38,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
@@ -68,12 +63,8 @@ class GridDhtPartitionSupplier {
     /** Preload predicate. */
     private IgnitePredicate<GridCacheEntryInfo> preloadPred;
 
-    /** Supply context map. T2: nodeId, idx. */
-    private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap =
-        new ConcurrentHashMap8<>();
-
-    /** Rebalancing listener. */
-    private GridLocalEventListener lsnr;
+    /** Supply context map. T2: nodeId, idx, topVer. */
+    private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
 
     /**
      * @param cctx Cache context.
@@ -94,32 +85,6 @@ class GridDhtPartitionSupplier {
      *
      */
     void start() {
-        lsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                if (evt instanceof DiscoveryEvent) {
-                    for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
-                        T2<UUID, Integer> t = entry.getKey();
-
-                        if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) {
-                            SupplyContext sctx = entry.getValue();
-
-                            clearContext(sctx, log);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]");
-
-                            scMap.remove(t, sctx);
-                        }
-                    }
-                }
-                else {
-                    assert false;
-                }
-            }
-        };
-
-        cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
         startOldListeners();
     }
 
@@ -127,11 +92,16 @@ class GridDhtPartitionSupplier {
      *
      */
     void stop() {
-        if (lsnr != null)
-            cctx.events().removeListener(lsnr);
+        synchronized (scMap) {
+            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+            while (it.hasNext()) {
+                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
 
-        for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) {
-            clearContext(entry.getValue(), log);
+                clearContext(scMap.get(t), log);
+
+                it.remove();
+            }
         }
 
         stopOldListeners();
@@ -152,10 +122,7 @@ class GridDhtPartitionSupplier {
 
             if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) {
                 try {
-                    synchronized (it) {
-                        if (!((GridCloseableIterator)it).isClosed())
-                            ((GridCloseableIterator)it).close();
-                    }
+                    ((GridCloseableIterator)it).close();
                 }
                 catch (IgniteCheckedException e) {
                     log.error("Iterator close failed.", e);
@@ -164,12 +131,34 @@ class GridDhtPartitionSupplier {
 
             final GridDhtLocalPartition loc = sc.loc;
 
-            if (loc != null && loc.reservations() > 0) {
-                synchronized (loc) {
-                    if (loc.reservations() > 0)
-                        loc.release();
-                }
+            if (loc != null) {
+                assert loc.reservations() > 0;
 
+                loc.release();
+            }
+        }
+    }
+
+    /**
+     * Handles new topology.
+     *
+     * @param topVer Topology version.
+     */
+    public void onTopologyChanged(AffinityTopologyVersion topVer) {
+        synchronized (scMap) {
+            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
+
+            while (it.hasNext()) {
+                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
+
+                if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts.
+                    clearContext(scMap.get(t), log);
+
+                    it.remove();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Supply context removed [node=" + t.get1() + "]");
+                }
             }
         }
     }
@@ -195,6 +184,16 @@ class GridDhtPartitionSupplier {
         AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
+        T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
+
+        if (d.updateSequence() == -1) {//Demand node requested context cleanup.
+            synchronized (scMap) {
+                clearContext(scMap.remove(scId), log);
+
+                return;
+            }
+        }
+
         if (cutTop.compareTo(demTop) > 0) {
             if (log.isDebugEnabled())
                 log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop +
@@ -212,16 +211,13 @@ class GridDhtPartitionSupplier {
 
         ClusterNode node = cctx.discovery().node(id);
 
-        T2<UUID, Integer> scId = new T2<>(id, idx);
-
         try {
-            SupplyContext sctx = scMap.remove(scId);
+            SupplyContext sctx;
 
-            // Context will be cleaned in case topology changed.
-            if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) {
-                clearContext(sctx, log);
+            synchronized (scMap) {
+                sctx = scMap.remove(scId);
 
-                sctx = null;
+                assert sctx == null || d.updateSequence() == sctx.updateSeq;
             }
 
             // Initial demand request should contain partitions list.
@@ -371,7 +367,6 @@ class GridDhtPartitionSupplier {
                                 swapLsnr,
                                 part,
                                 loc,
-                                d.topologyVersion(),
                                 d.updateSequence());
                         }
                     }
@@ -497,7 +492,6 @@ class GridDhtPartitionSupplier {
                                 null,
                                 part,
                                 loc,
-                                d.topologyVersion(),
                                 d.updateSequence());
                         }
                     }
@@ -579,8 +573,6 @@ class GridDhtPartitionSupplier {
                 }
             }
 
-            scMap.remove(scId);
-
             reply(node, d, s, scId);
 
             if (log.isDebugEnabled())
@@ -604,7 +596,7 @@ class GridDhtPartitionSupplier {
     private boolean reply(ClusterNode n,
         GridDhtPartitionDemandMessage d,
         GridDhtPartitionSupplyMessageV2 s,
-        T2<UUID, Integer> scId)
+        T3<UUID, Integer, AffinityTopologyVersion> scId)
         throws IgniteCheckedException {
 
         try {
@@ -623,7 +615,9 @@ class GridDhtPartitionSupplier {
             if (log.isDebugEnabled())
                 log.debug("Failed to send partition supply message because node left grid: " + n.id());
 
-            clearContext(scMap.remove(scId), log);
+            synchronized (scMap) {
+                clearContext(scMap.remove(scId), log);
+            }
 
             return false;
         }
@@ -638,7 +632,7 @@ class GridDhtPartitionSupplier {
      * @param swapLsnr Swap listener.
      */
     private void saveSupplyContext(
-        T2<UUID, Integer> t,
+        T3<UUID, Integer, AffinityTopologyVersion> t,
         int phase,
         Iterator<Integer> partIt,
         int part,
@@ -646,17 +640,20 @@ class GridDhtPartitionSupplier {
         GridDhtLocalPartition loc,
         AffinityTopologyVersion topVer,
         long updateSeq) {
-        SupplyContext old = scMap.putIfAbsent(t,
-            new SupplyContext(phase,
-                partIt,
-                entryIt,
-                swapLsnr,
-                part,
-                loc,
-                topVer,
-                updateSeq));
-
-        assert old == null;
+        synchronized (scMap) {
+            if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+                assert scMap.get(t) == null;
+
+                scMap.put(t,
+                    new SupplyContext(phase,
+                        partIt,
+                        entryIt,
+                        swapLsnr,
+                        part,
+                        loc,
+                        updateSeq));
+            }
+        }
     }
 
     /**
@@ -681,9 +678,6 @@ class GridDhtPartitionSupplier {
         /** Local partition. */
         private final GridDhtLocalPartition loc;
 
-        /** Topology version. */
-        private final AffinityTopologyVersion topVer;
-
         /** Update seq. */
         private final long updateSeq;
 
@@ -700,7 +694,6 @@ class GridDhtPartitionSupplier {
             GridCacheEntryInfoCollectSwapListener swapLsnr,
             int part,
             GridDhtLocalPartition loc,
-            AffinityTopologyVersion topVer,
             long updateSeq) {
             this.phase = phase;
             this.partIt = partIt;
@@ -708,7 +701,6 @@ class GridDhtPartitionSupplier {
             this.swapLsnr = swapLsnr;
             this.part = part;
             this.loc = loc;
-            this.topVer = topVer;
             this.updateSeq = updateSeq;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index d68e417..502620c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -81,8 +81,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
      * @param addDepInfo Deployment info flag.
      */
     GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
-        assert updateSeq > 0;
-
         this.cacheId = cacheId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 77e47a7..5c7190b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     // Must initialize topology after we get discovery event.
                     initTopology(cacheCtx);
 
+                    cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+
                     cacheCtx.preloader().updateLastExchangeFuture(this);
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 64d5a19..36e0c9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -272,6 +272,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+        supplier.onTopologyChanged(topVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = cctx.dht().topology();

http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index cea7808..b17588f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -316,7 +316,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
                 Map map = U.field(supplier, "scMap");
 
-                assert map.isEmpty();
+                synchronized (map) {
+                    assert map.isEmpty();
+                }
             }
         }
     }
@@ -357,8 +359,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                     waitForRebalancing(3, 5, 0);
                     waitForRebalancing(4, 5, 0);
 
-                    checkSupplyContextMapIsEmpty();
-
                     //New cache should start rebalancing.
                     CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
 
@@ -431,14 +431,14 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         t4.start();
 
-        stopGrid(0);
+        stopGrid(1);
 
-        waitForRebalancing(1, 6);
+        waitForRebalancing(0, 6);
         waitForRebalancing(2, 6);
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
-        stopGrid(1);
+        stopGrid(0);
 
         waitForRebalancing(2, 7);
         waitForRebalancing(3, 7);