You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/11 11:18:58 UTC

ignite git commit: ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.

Repository: ignite
Updated Branches:
  refs/heads/master bf25b5c52 -> e93b28488


ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e93b2848
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e93b2848
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e93b2848

Branch: refs/heads/master
Commit: e93b28488693381fcd232de93087ab8ec1d0f5bb
Parents: bf25b5c
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 14:18:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 14:18:52 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 211 +++++++------------
 1 file changed, 74 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e93b2848/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 548d795..79ab183 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -83,9 +83,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final long clientCacheMsgTimeout =
         IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000);
 
-    /** Late affinity assignment flag. */
-    private boolean lateAffAssign;
-
     /** Affinity information for all started caches (initialized on coordinator). */
     private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>();
 
@@ -132,13 +129,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        if (cctx.database().persistenceEnabled() && !cctx.kernalContext().config().isLateAffinityAssignment())
-            U.quietAndWarn(log,
-                "Persistence is enabled, but late affinity assignment is disabled. " +
-                    "Since it is required for persistence mode, it will be implicitly enabled.");
-
-        lateAffAssign = cctx.kernalContext().config().isLateAffinityAssignment() || cctx.database().persistenceEnabled();
-
         cctx.kernalContext().event().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
@@ -193,8 +183,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return {@code True} if minor topology version should be increased.
      */
     boolean onCustomEvent(CacheAffinityChangeMessage msg) {
-        assert lateAffAssign : msg;
-
         if (msg.exchangeId() != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer +
@@ -259,9 +247,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param checkGrpId Group ID.
      */
     void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
-        if (!lateAffAssign)
-            return;
-
         CacheAffinityChangeMessage msg = null;
 
         synchronized (mux) {
@@ -349,13 +334,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param grp Cache group.
      */
     void onCacheGroupCreated(CacheGroupContext grp) {
-        final Integer grpId = grp.groupId();
-
         if (!grpHolders.containsKey(grp.groupId())) {
-            cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(grpId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 });
         }
@@ -692,7 +675,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
-        forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+        forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 if (exchActions.cacheGroupStopping(aff.groupId()))
                     return;
@@ -772,7 +755,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             Integer grpId = action.descriptor().groupId();
 
             if (gprs.add(grpId)) {
-                if (crd && lateAffAssign)
+                if (crd)
                     initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());
                 else {
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
@@ -813,7 +796,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         Set<Integer> stoppedGrps = null;
 
-        if (crd && lateAffAssign) {
+        if (crd) {
             for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) {
                 if (data.descriptor().config().getCacheMode() != LOCAL) {
                     CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId());
@@ -1026,32 +1009,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
         boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
 
-        if (lateAffAssign) {
-            if (!locJoin) {
-                forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
-                    @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
-
-                        aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
-                    }
-                });
-            }
-            else
-                fetchAffinityOnJoin(fut);
-        }
-        else {
-            if (!locJoin) {
-                forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
-                    @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
+        if (!locJoin) {
+            forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+                @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+                    AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                        aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
-                    }
-                });
-            }
-            else
-                initAffinityNoLateAssignment(fut);
+                    aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
+                }
+            });
         }
+        else
+            fetchAffinityOnJoin(fut);
     }
 
     /**
@@ -1074,11 +1042,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param grpId Cache group ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
@@ -1093,8 +1060,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed
      */
     private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
-        assert lateAffAssign;
-
         for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
             if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
@@ -1179,10 +1144,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         caches.initStartedCaches(descs);
 
-        if (crd && lateAffAssign) {
+        if (crd) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut, desc);
+                    CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
 
                     if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
@@ -1247,7 +1212,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         assert desc != null : aff.cacheOrGroupName();
 
         // Do not request affinity from remote nodes if affinity function is not centralized.
-        if (!lateAffAssign && !aff.centralizedAffinityFunction())
+        if (!aff.centralizedAffinityFunction())
             return true;
 
         // If local node did not initiate exchange or local node is the only cache node in grid.
@@ -1272,31 +1237,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         WaitRebalanceInfo waitRebalanceInfo = null;
 
-        if (lateAffAssign) {
-            if (locJoin) {
-                if (crd) {
-                    forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
-                        @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                            AffinityTopologyVersion topVer = fut.topologyVersion();
+        if (locJoin) {
+            if (crd) {
+                forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+                    @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+                        AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                            CacheGroupHolder cache = groupHolder(fut, desc);
+                        CacheGroupHolder cache = groupHolder(topVer, desc);
 
-                            List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
-                                fut.discoveryEvent(),
-                                fut.discoCache());
+                        List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+                            fut.discoveryEvent(),
+                            fut.discoCache());
 
-                            cache.affinity().initialize(topVer, newAff);
-                        }
-                    });
-                }
-                else
-                    fetchAffinityOnJoin(fut);
+                        cache.affinity().initialize(topVer, newAff);
+                    }
+                });
             }
             else
-                waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
+                fetchAffinityOnJoin(fut);
+        }
+        else {
+            waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                crd);
         }
-        else
-            initAffinityNoLateAssignment(fut);
 
         synchronized (mux) {
             affCalcVer = fut.topologyVersion();
@@ -1305,7 +1270,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             WaitRebalanceInfo info = this.waitInfo;
 
-            if (crd && lateAffAssign) {
+            if (crd) {
                 if (log.isDebugEnabled()) {
                     log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
                         ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
@@ -1412,7 +1377,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (idealAff != null)
                 affCache.idealAssignment(idealAff);
             else {
-                assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
+                assert !affCache.centralizedAffinityFunction();
 
                 affCache.calculate(topVer, discoveryEvt, discoCache);
             }
@@ -1439,22 +1404,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert !leftNode.isClient() : leftNode;
 
-        boolean centralizedAff;
-
-        if (lateAffAssign) {
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
-
-                grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-            }
-
-            centralizedAff = true;
-        }
-        else {
-            initAffinityNoLateAssignment(fut);
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
+                continue;
 
-            centralizedAff = false;
+            grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
         }
 
         synchronized (mux) {
@@ -1463,22 +1417,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             this.waitInfo = null;
         }
 
-        return centralizedAff;
-    }
-
-    /**
-     * @param fut Exchange future.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        assert !lateAffAssign;
-
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal())
-                continue;
-
-            initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
-        }
+        return true;
     }
 
     /**
@@ -1488,8 +1427,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
 
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -1512,7 +1449,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                                processAffinityAssignmentResponse(grpId, nodeId, res);
+                                processAffinityAssignmentResponse(nodeId, res);
                             }
                         }
                     );
@@ -1587,15 +1524,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
      * @param desc Cache descriptor.
      * @return Cache holder.
      * @throws IgniteCheckedException If failed.
      */
-    private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc)
+    private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, final CacheGroupDescriptor desc)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId());
 
         if (cacheGrp != null)
@@ -1607,12 +1542,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(desc.groupId(), nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 }
             );
 
-            cacheGrp = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null);
+            cacheGrp = CacheGroupHolder2.create(cctx, desc, topVer, null);
         }
         else
             cacheGrp = new CacheGroupHolder1(grp, null);
@@ -1625,17 +1560,18 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param evt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return Rabalance info.
      */
-    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd)
+    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer,
+        final DiscoveryEvent evt,
+        final DiscoCache discoCache,
+        boolean crd)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
@@ -1645,7 +1581,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache);
+                initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1655,11 +1591,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut, desc);
+                    CacheGroupHolder cache = groupHolder(topVer, desc);
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+                    initAffinityOnNodeJoin(topVer,
+                        evt,
+                        discoCache,
+                        cache.affinity(),
+                        waitRebalanceInfo,
+                        latePrimary,
+                        affCache);
                 }
             });
 
@@ -1668,24 +1610,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param evt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
      * @param affCache Already calculated assignments (to reduce data stored in history).
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
+    private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
+        DiscoveryEvent evt,
+        DiscoCache discoCache,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
         Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
-        assert lateAffAssign;
-
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         AffinityTopologyVersion affTopVer = aff.lastVersion();
 
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
@@ -1695,7 +1637,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1726,7 +1668,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+        aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
     }
 
     /**
@@ -1762,7 +1704,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         ClusterNode curPrimary,
         List<ClusterNode> newNodes,
         WaitRebalanceInfo rebalance) {
-        assert lateAffAssign;
         assert curPrimary != null;
         assert !F.isEmpty(newNodes);
         assert !curPrimary.equals(newNodes.get(0));
@@ -1791,8 +1732,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(
         final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
-        assert lateAffAssign;
-
         IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
 
         if (initFut != null && !initFut.isDone()) {
@@ -1822,8 +1761,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
         final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
@@ -1834,7 +1771,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
-                CacheGroupHolder grpHolder = groupHolder(fut, desc);
+                CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc);
 
                 if (!grpHolder.rebalanceEnabled)
                     return;