You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/27 15:03:02 UTC

[27/28] ignite git commit: cache discovery data refactoring

cache discovery data refactoring


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0022f6b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0022f6b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0022f6b6

Branch: refs/heads/ignite-5075-cacheStart
Commit: 0022f6b6ed8585ca3c56b7282e91a373fcf54117
Parents: b71d4ef
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 27 12:40:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 27 17:57:40 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 154 +++++++--------
 .../processors/cache/ClusterCachesInfo.java     |  88 ++++++---
 .../cache/DynamicCacheChangeBatch.java          |   6 +
 .../processors/cache/ExchangeActions.java       | 195 ++++++++++++++++---
 .../GridCachePartitionExchangeManager.java      |  38 +---
 .../processors/cache/GridCacheProcessor.java    |  76 +++-----
 .../GridDhtPartitionsExchangeFuture.java        | 115 +++--------
 .../cluster/GridClusterStateProcessor.java      |  22 +--
 8 files changed, 391 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0958208..d3749f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -323,36 +323,37 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      *
      * @param fut Exchange future.
      * @param crd Coordinator flag.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if client-only exchange is needed.
      */
     public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        Collection<DynamicCacheChangeRequest> reqs)
+        ExchangeActions exchActions)
         throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : fut;
+        assert exchActions != null && !exchActions.empty() : fut;
 
-        for (DynamicCacheChangeRequest req : reqs) {
+        for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
             Integer cacheId = CU.cacheId(req.cacheName());
 
-            if (req.stop()) {
-                DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
+            DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
 
-                assert desc != null : cacheId;
-            }
-            else if (req.start() && !req.clientStartOnly()) {
-                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
-                    req.startCacheConfiguration(),
-                    req.cacheType(),
-                    false,
-                    req.deploymentId(),
-                    req.schema());
+            assert desc != null : cacheId;
+        }
 
-                DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+        for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
+            Integer cacheId = CU.cacheId(req.cacheName());
 
-                assert old == null : old;
-            }
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
+                req.startCacheConfiguration(),
+                req.cacheType(),
+                false,
+                req.deploymentId(),
+                req.schema());
+
+            DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+
+            assert old == null : old;
         }
 
         boolean clientOnly = true;
@@ -367,90 +368,85 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         });
 
-        Set<Integer> stoppedCaches = null;
-
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (!(req.clientStartOnly() || req.close()))
-                clientOnly = false;
-
+        for (DynamicCacheChangeRequest req : exchActions.startRequests()) {
             Integer cacheId = CU.cacheId(req.cacheName());
 
-            if (req.start()) {
-                cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+            cctx.cache().prepareCacheStart(req, fut.topologyVersion());
 
-                if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
-                    if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
-                        U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
-                }
+            if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+                if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
+                    U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
+            }
 
-                if (!crd || !lateAffAssign) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            if (!crd || !lateAffAssign) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                    if (cacheCtx != null && !cacheCtx.isLocal()) {
-                        boolean clientCacheStarted =
-                            req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+                if (cacheCtx != null && !cacheCtx.isLocal()) {
+                    boolean clientCacheStarted =
+                        req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
 
-                        if (clientCacheStarted)
-                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
-                        else if (!req.clientStartOnly()) {
-                            assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                    if (clientCacheStarted)
+                        initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+                    else if (!req.clientStartOnly()) {
+                        assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
-                            GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                        GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
-                            assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+                        assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
-                            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent(), fut.discoCache());
+                        List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
+                            fut.discoveryEvent(), fut.discoCache());
 
-                            aff.initialize(fut.topologyVersion(), assignment);
-                        }
+                        aff.initialize(fut.topologyVersion(), assignment);
                     }
                 }
-                else
-                    initStartedCacheOnCoordinator(fut, cacheId);
             }
-            else if (req.stop() || req.close()) {
-                cctx.cache().blockGateway(req);
-
-                if (crd) {
-                    boolean rmvCache = false;
+            else
+                initStartedCacheOnCoordinator(fut, cacheId);
+        }
 
-                    if (req.close() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+        for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
+            Integer cacheId = CU.cacheId(req.cacheName());
 
-                        rmvCache = cacheCtx != null && !cacheCtx.affinityNode();
-                    }
-                    else if (req.stop())
-                        rmvCache = true;
+            cctx.cache().blockGateway(req);
 
-                    if (rmvCache) {
-                        CacheHolder cache = caches.remove(cacheId);
+            if (crd) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                        if (cache != null) {
-                            if (!req.stop()) {
-                                assert !cache.client();
+                assert cacheCtx != null : req;
 
-                                cache = CacheHolder2.create(cctx,
-                                    cctx.cache().cacheDescriptor(cacheId),
-                                    fut,
-                                    cache.affinity());
+                CacheHolder cache = caches.remove(cacheId);
 
-                                caches.put(cacheId, cache);
-                            }
-                            else {
-                                if (stoppedCaches == null)
-                                    stoppedCaches = new HashSet<>();
+                assert !cache.client();
 
-                                stoppedCaches.add(cache.cacheId());
+                cache = CacheHolder2.create(cctx,
+                    cctx.cache().cacheDescriptor(cacheId),
+                    fut,
+                    cache.affinity());
 
-                                cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
-                            }
-                        }
-                    }
-                }
+                caches.put(cacheId, cache);
             }
         }
 
+        Set<Integer> stoppedCaches = null;
+
+        for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
+            Integer cacheId = CU.cacheId(req.cacheName());
+
+            cctx.cache().blockGateway(req);
+
+            CacheHolder cache = caches.remove(cacheId);
+
+            assert cache != null : req;
+
+            if (stoppedCaches == null)
+                stoppedCaches = new HashSet<>();
+
+            stoppedCaches.add(cache.cacheId());
+
+            cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
+        }
+
         if (stoppedCaches != null) {
             boolean notify = false;
 
@@ -479,7 +475,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
-        return clientOnly;
+        return exchActions.clientOnlyExchange();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 8824a48..cd2cd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -17,15 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 
@@ -90,7 +94,9 @@ class ClusterCachesInfo {
 
         boolean incMinorTopVer = false;
 
-        List<DynamicCacheDescriptor> added = null;
+        List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+        final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
 
         for (DynamicCacheChangeRequest req : batch.requests()) {
             if (req.template()) {
@@ -114,10 +120,7 @@ class ClusterCachesInfo {
 
                     assert old == null;
 
-                    if (added == null)
-                        added = new ArrayList<>();
-
-                    added.add(templateDesc);
+                    addedDescs.add(templateDesc);
                 }
 
                 ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
@@ -129,6 +132,8 @@ class ClusterCachesInfo {
 
             boolean needExchange = false;
 
+            AffinityTopologyVersion waitTopVer = null;
+
             if (req.start()) {
                 if (desc == null) {
                     if (req.clientStartOnly()) {
@@ -164,12 +169,9 @@ class ClusterCachesInfo {
                             req.initiatingNodeId(),
                             req.nearCacheConfiguration() != null);
 
-                        if (added == null)
-                            added = new ArrayList<>();
+                        addedDescs.add(startDesc);
 
-                        added.add(startDesc);
-
-                        exchangeActions.addCacheToStart(req, desc);
+                        exchangeActions.addCacheToStart(req, startDesc);
 
                         needExchange = true;
                     }
@@ -208,15 +210,23 @@ class ClusterCachesInfo {
                     }
                 }
 
-                if (!needExchange && desc != null) {
-                    if (desc.clientCacheStartVersion() != null)
-                        req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
-                    else
-                        req.cacheFutureTopologyVersion(desc.startTopologyVersion());
+                if (!needExchange) {
+                    if (desc != null) {
+                        if (desc.clientCacheStartVersion() != null)
+                            waitTopVer = desc.clientCacheStartVersion();
+                        else
+                            waitTopVer = desc.startTopologyVersion();
+                    }
                 }
             }
-            else if (req.globalStateChange() || req.resetLostPartitions())
+            else if (req.globalStateChange())
                 needExchange = true;
+            else if (req.resetLostPartitions()) {
+                needExchange = desc != null;
+
+                if (needExchange)
+                    exchangeActions.addCacheToResetLostPartitions(req, desc);
+            }
             else {
                 assert req.stop() ^ req.close() : req;
 
@@ -230,29 +240,63 @@ class ClusterCachesInfo {
 
                         needExchange = true;
 
-                        exchangeActions.addCacheToStop(req);
+                        exchangeActions.addCacheToStop(req, desc);
                     }
                     else {
                         assert req.close() : req;
 
                         needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
 
-                        if (needExchange)
-                            exchangeActions.addCacheToStop(req);
+                        if (needExchange) {
+                            exchangeActions.addCacheToStop(req, desc);
+
+                            exchangeActions.addCacheToClose(req, desc);
+                        }
                     }
                 }
             }
 
-            incMinorTopVer |= needExchange;
+            if (!needExchange) {
+                if (req.initiatingNodeId().equals(ctx.localNodeId()))
+                    reqsToComplete.add(new T2<>(req, waitTopVer));
+            }
+            else
+                incMinorTopVer = true;
         }
 
-        if (added != null) {
+        if (!F.isEmpty(addedDescs)) {
             AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
 
-            for (DynamicCacheDescriptor desc : added)
+            for (DynamicCacheDescriptor desc : addedDescs)
                 desc.startTopologyVersion(startTopVer);
         }
 
+        if (!F.isEmpty(reqsToComplete)) {
+            ctx.closure().callLocalSafe(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
+                        final DynamicCacheChangeRequest req = t.get1();
+                        AffinityTopologyVersion waitTopVer = t.get2();
+
+                        IgniteInternalFuture<?> fut = waitTopVer != null ?
+                            ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+
+                        if (fut == null || fut.isDone())
+                            ctx.cache().completeCacheStartFuture(req, null);
+                        else {
+                            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    ctx.cache().completeCacheStartFuture(req, null);
+                                }
+                            });
+                        }
+                    }
+
+                    return null;
+                }
+            });
+        }
+
         if (incMinorTopVer) {
             assert !exchangeActions.empty() : exchangeActions;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index e44bfcc..f69246e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -82,7 +82,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return exchangeActions != null && !exchangeActions.empty();
     }
 
+    ExchangeActions exchangeActions() {
+        return exchangeActions;
+    }
+
     void exchangeActions(ExchangeActions exchangeActions) {
+        assert !exchangeActions.empty() : exchangeActions;
+
         this.exchangeActions = exchangeActions;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 5adafc9..a7b62ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -17,6 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 
 import java.util.ArrayList;
@@ -27,64 +34,190 @@ import java.util.List;
  */
 public class ExchangeActions {
     /** */
-    private List<ActionData<DynamicCacheDescriptor>> cachesToStart;
+    private Map<String, ActionData> cachesToStart;
 
     /** */
-    private List<ActionData<DynamicCacheDescriptor>> clientCachesToStart;
+    private Map<String, ActionData> clientCachesToStart;
 
     /** */
-    private List<ActionData<String>> cachesToStop;
+    private Map<String, ActionData> cachesToStop;
 
     /** */
-    private List<ActionData<String>> cachesToClose;
+    private Map<String, ActionData> cachesToClose;
 
-    void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
-        if (cachesToStart == null)
-            cachesToStart = new ArrayList<>();
+    /** */
+    private Map<String, ActionData> cachesToResetLostParts;
+
+    /** */
+    private ClusterState newState;
 
-        cachesToStart.add(new ActionData<>(req, desc));
+    public boolean clientOnlyExchange() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToResetLostParts);
     }
 
-    void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
-        if (clientCachesToStart == null)
-            clientCachesToStart = new ArrayList<>();
+    public List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+        List<DynamicCacheChangeRequest> res = null;
+
+        if (cachesToClose != null) {
+            for (ActionData req : cachesToClose.values()) {
+                if (nodeId.equals(req.req.initiatingNodeId())) {
+                    if (res == null)
+                        res = new ArrayList<>(cachesToClose.size());
+
+                    res.add(req.req);
+                }
+            }
+        }
+
+        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+    }
+
+    public List<DynamicCacheChangeRequest> startRequests() {
+        List<DynamicCacheChangeRequest> res = null;
+
+        if (cachesToStart != null) {
+            res = new ArrayList<>(cachesToStart.size());
+
+            for (ActionData req : cachesToStart.values())
+                res.add(req.req);
+        }
 
-        clientCachesToStart.add(new ActionData<>(req, desc));
+        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
     }
 
-    void addCacheToStop(DynamicCacheChangeRequest req) {
-        if (cachesToStop == null)
-            cachesToStop = new ArrayList<>();
+    public List<DynamicCacheChangeRequest> stopRequests() {
+        List<DynamicCacheChangeRequest> res = null;
+
+        if (cachesToStop != null) {
+            res = new ArrayList<>(cachesToStop.size());
+
+            for (ActionData req : cachesToStop.values())
+                res.add(req.req);
+        }
 
-        cachesToStop.add(new ActionData<>(req, req.cacheName()));
+        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
     }
 
-    void addCacheToClose(DynamicCacheChangeRequest req) {
-        if (cachesToClose == null)
-            cachesToClose = new ArrayList<>();
+    public void completeRequestFutures(GridCacheSharedContext ctx) {
+        completeRequestFutures(cachesToStart, ctx);
+        completeRequestFutures(clientCachesToStart, ctx);
+        completeRequestFutures(cachesToStop, ctx);
+        completeRequestFutures(cachesToClose, ctx);
+        completeRequestFutures(cachesToResetLostParts, ctx);
+    }
 
-        cachesToClose.add(new ActionData<>(req, req.cacheName()));
+    private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
+        if (map != null) {
+            for (ActionData req : map.values())
+                ctx.cache().completeCacheStartFuture(req.req, null);
+        }
     }
 
-    boolean empty() {
-        return F.isEmpty(cachesToStart) &&
-            F.isEmpty(clientCachesToStart) &&
-            F.isEmpty(cachesToStop) &&
-            F.isEmpty(cachesToClose);
+    public boolean hasStop() {
+        return !F.isEmpty(cachesToStop);
+    }
+
+    public Set<String> cachesToResetLostPartitions() {
+        Set<String> caches = null;
+        
+        if (cachesToResetLostParts != null)
+            caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+        return caches != null ? caches : Collections.<String>emptySet();
+    }
+    
+    public boolean cacheStopped(int cacheId) {
+        if (cachesToStop != null) {
+            for (ActionData cache : cachesToStop.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
     }
 
-    void addFutureToComplete() {
+    public boolean cacheStarted(int cacheId) {
+        if (cachesToStart != null) {
+            for (ActionData cache : cachesToStart.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    public boolean clientCacheStarted(UUID nodeId) {
+        if (clientCachesToStart != null) {
+            for (ActionData cache : clientCachesToStart.values()) {
+                if (nodeId.equals(cache.req.initiatingNodeId()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    public ClusterState newClusterState() {
+        return newState;
+    }
+
+    private Map<String, ActionData> add(Map<String, ActionData> map, DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req != null;
+        assert desc != null;
+
+        if (map == null)
+            map = new HashMap<>();
+
+        ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+        assert old == null : old;
+
+        return map;
+    }
 
+    void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        cachesToStart = add(cachesToStart, req, desc);
+    }
+
+    void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        clientCachesToStart = add(clientCachesToStart, req, desc);
+    }
+
+    void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        cachesToStop = add(cachesToStop, req, desc);
+    }
+
+    void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        cachesToClose = add(cachesToClose, req, desc);
+    }
+
+    void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+    }
+
+    public boolean empty() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(clientCachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToClose) &&
+            F.isEmpty(cachesToResetLostParts);
     }
 
-    static class ActionData<T> {
-        DynamicCacheChangeRequest req;
+    /**
+     *
+     */
+    static class ActionData {
+        private DynamicCacheChangeRequest req;
 
-        T data;
+        private DynamicCacheDescriptor desc;
 
-        public ActionData(DynamicCacheChangeRequest req, T data) {
+        public ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
             this.req = req;
-            this.data = data;
+            this.desc = desc;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..c2b0e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (customMsg instanceof DynamicCacheChangeBatch) {
                         DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
-                        Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
-                        // Validate requests to check if event should trigger partition exchange.
-                        for (final DynamicCacheChangeRequest req : batch.requests()) {
-                            if (req.exchangeNeeded())
-                                valid.add(req);
-                            else {
-                                IgniteInternalFuture<?> fut = null;
-
-                                if (req.cacheFutureTopologyVersion() != null)
-                                    fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
-                                if (fut == null || fut.isDone())
-                                    cctx.cache().completeStartFuture(req);
-                                else {
-                                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                        @Override public void apply(IgniteInternalFuture<?> fut) {
-                                            cctx.cache().completeStartFuture(req);
-                                        }
-                                    });
-                                }
-                            }
-                        }
+                        ExchangeActions exchActions = batch.exchangeActions();
 
-                        if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
+                        if (exchActions != null) {
                             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                            exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+                            exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
                         }
                     }
                     else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -1123,25 +1101,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
      * @param cache Discovery data cache.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change actions.
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
     private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable DiscoCache cache,
-        @Nullable Collection<DynamicCacheChangeRequest> reqs,
+        @Nullable ExchangeActions exchActions,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {
         GridDhtPartitionsExchangeFuture fut;
 
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions, affChangeMsg));
 
         if (old != null) {
             fut = old;
 
-            if (reqs != null)
-                fut.cacheChangeRequests(reqs);
+            if (exchActions != null)
+                fut.exchangeActions(exchActions);
 
             if (affChangeMsg != null)
                 fut.affinityChangeMessage(affChangeMsg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4f86dac..55690b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2031,13 +2031,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
      * @param topVer Completed topology version.
-     * @param reqs Change requests.
+     * @param exchActions Change requests.
      * @param err Error.
      */
     @SuppressWarnings("unchecked")
     public void onExchangeDone(
         AffinityTopologyVersion topVer,
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         Throwable err
     ) {
         for (GridCacheAdapter<?, ?> cache : caches.values()) {
@@ -2053,48 +2053,52 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
-        if (!F.isEmpty(reqs) && err == null) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                String masked = req.cacheName();
+        if (exchActions != null && err == null) {
+            for (DynamicCacheChangeRequest req : exchActions.stopRequests()) {
+                stopGateway(req);
 
-                if (req.stop()) {
-                    stopGateway(req);
+                prepareCacheStop(req);
+            }
 
-                    prepareCacheStop(req);
-                }
-                else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
-                    IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+            for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
+                String cacheName = req.cacheName();
 
-                    if (proxy != null) {
-                        if (proxy.context().affinityNode()) {
-                            GridCacheAdapter<?, ?> cache = caches.get(masked);
+                IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName);
 
-                            if (cache != null)
-                                jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
-                        }
-                        else {
-                            proxy.context().gate().onStopped();
+                if (proxy != null) {
+                    if (proxy.context().affinityNode()) {
+                        GridCacheAdapter<?, ?> cache = caches.get(cacheName);
 
-                            prepareCacheStop(req);
-                        }
+                        if (cache != null)
+                            jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false));
+                    }
+                    else {
+                        proxy.context().gate().onStopped();
+
+                        prepareCacheStop(req);
                     }
                 }
             }
         }
     }
 
+    void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+        GridCacheProcessor.TemplateConfigurationFuture fut =
+            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+
+        if (fut != null && fut.deploymentId().equals(deploymentId))
+            fut.onDone();
+    }
+
     /**
      * @param req Request to complete future for.
      */
-    public void completeStartFuture(DynamicCacheChangeRequest req) {
+    public void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
         if (req.initiatingNodeId().equals(ctx.localNodeId())) {
             DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
 
-            assert req.deploymentId() != null || req.globalStateChange() || req.resetLostPartitions();
-            assert fut == null || req.globalStateChange() || req.resetLostPartitions();
-
             if (fut != null)
-                fut.onDone();
+                fut.onDone(null, err);
         }
     }
 
@@ -2506,8 +2510,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
             if (desc == null) {
-                log.warning("Reset lost partition will not be executed, " +
-                    "because cache with name:" + cacheName + " doesn't not exist");
+                U.warn(log, "Failed to find cache for reset lost partition request, cache does not exist: " + cacheName);
 
                 continue;
             }
@@ -2733,23 +2736,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         return false;
     }
 
-    void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
-        GridCacheProcessor.TemplateConfigurationFuture fut =
-            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
-
-        if (fut != null && fut.deploymentId().equals(deploymentId))
-            fut.onDone();
-    }
-
-    void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
-        if (ctx.localNodeId().equals(req.initiatingNodeId())) {
-            DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
-            if (fut != null)
-                fut.onDone(err);
-        }
-    }
-
     /**
      * @param req Cache change request.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3c056fd..320480c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +111,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
 
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Dummy flag. */
     private final boolean dummy;
 
@@ -190,8 +188,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Logger. */
     private final IgniteLogger log;
 
-    /** Dynamic cache change requests. */
-    private Collection<DynamicCacheChangeRequest> reqs;
+    /** Cache change requests. */
+    private ExchangeActions exchActions;
 
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +282,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @param affChangeMsg Affinity change message.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
         ReadWriteLock busyLock,
         GridDhtPartitionExchangeId exchId,
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         CacheAffinityChangeMessage affChangeMsg
     ) {
         assert busyLock != null;
         assert exchId != null;
         assert exchId.topologyVersion() != null;
+        assert exchActions == null || !exchActions.empty();
 
         dummy = false;
         forcePreload = false;
@@ -305,7 +304,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.cctx = cctx;
         this.busyLock = busyLock;
         this.exchId = exchId;
-        this.reqs = reqs;
+        this.exchActions = exchActions;
         this.affChangeMsg = affChangeMsg;
 
         log = cctx.logger(getClass());
@@ -317,10 +316,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param reqs Cache change requests.
+     * @param exchActions Exchange actions.
      */
-    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
-        this.reqs = reqs;
+    public void exchangeActions(ExchangeActions exchActions) {
+        assert exchActions == null || !exchActions.empty();
+        assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+        this.exchActions = exchActions;
     }
 
     /**
@@ -396,16 +398,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @return {@code True} if non-client cache was added during this exchange.
      */
     public boolean cacheStarted(int cacheId) {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && !req.clientStartOnly()) {
-                    if (CU.cacheId(req.cacheName()) == cacheId)
-                        return true;
-                }
-            }
-        }
+        return exchActions != null && exchActions.cacheStarted(cacheId);
 
-        return false;
     }
 
     /**
@@ -435,14 +429,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     public ClusterState newClusterState() {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.globalStateChange())
-                    return req.state();
-            }
-        }
-
-        return null;
+        return exchActions != null ? exchActions.newClusterState() : null;
     }
 
     /**
@@ -524,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
 
                 if (msg instanceof DynamicCacheChangeBatch){
-                    assert !F.isEmpty(reqs);
+                    assert exchActions != null && !exchActions.empty();
 
                     exchange = onCacheChangeRequest(crdNode);
                 }
@@ -558,20 +545,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             updateTopologies(crdNode);
 
-            if (!F.isEmpty(reqs)) {
-                boolean hasStop = false;
-
-                for (DynamicCacheChangeRequest req : reqs) {
-                    if (req.stop()) {
-                        hasStop = true;
-
-                        break;
-                    }
-                }
-
-                if (hasStop)
-                    cctx.cache().context().database().beforeCachesStop();
-            }
+            if (exchActions != null && exchActions.hasStop())
+                cctx.cache().context().database().beforeCachesStop();
 
             switch (exchange) {
                 case ALL: {
@@ -679,32 +654,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : this;
+        assert exchActions != null && !exchActions.empty() : this;
 
         GridClusterStateProcessor stateProc = cctx.kernalContext().state();
 
-        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
             changeGlobalStateE = stateProc.onChangeGlobalState();
 
             if (crd && changeGlobalStateE != null)
                 changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
         }
 
-        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
+        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
 
-        if (clientOnly) {
-            boolean clientCacheStarted = false;
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                    clientCacheStarted = true;
-
-                    break;
-                }
-            }
-
-            return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
-        }
+        if (clientOnly)
+            return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
         else
             return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
     }
@@ -1051,19 +1015,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @return {@code True} if cache is stopping by this exchange.
      */
     public boolean stopping(int cacheId) {
-        boolean stopping = false;
-
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (cacheId == CU.cacheId(req.cacheName())) {
-                    stopping = req.stop();
-
-                    break;
-                }
-            }
-        }
-
-        return stopping;
+        return exchActions != null && exchActions.cacheStopped(cacheId);
     }
 
     /**
@@ -1074,13 +1026,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert node != null;
 
         // Reset lost partition before send local partition to coordinator.
-        if (!F.isEmpty(reqs)) {
-            Set<String> caches = new HashSet<>();
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.resetLostPartitions())
-                    caches.add(req.cacheName());
-            }
+        if (exchActions != null) {
+            Set<String> caches = exchActions.cachesToResetLostPartitions();
 
             if (!F.isEmpty(caches))
                 resetLostPartitions(caches);
@@ -1208,14 +1155,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             cacheValidRes = m;
         }
 
-        cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+        cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
 
         cctx.exchange().onExchangeDone(this, err);
 
-        if (!F.isEmpty(reqs) && err == null) {
-            for (DynamicCacheChangeRequest req : reqs)
-                cctx.cache().completeStartFuture(req);
-        }
+        if (exchActions != null && err == null)
+            exchActions.completeRequestFutures(cctx);
 
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
@@ -1232,7 +1177,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
             }
 
-            reqs = null;
+            exchActions = null;
 
             if (discoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)discoEvt).customMessage(null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0022f6b6/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..a20ee41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -329,26 +330,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param reqs Requests.
+     * @param exchActions Requests.
+     * @param topVer Exchange topology version.
      */
     public boolean changeGlobalState(
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         AffinityTopologyVersion topVer
     ) {
-        assert !F.isEmpty(reqs);
+        assert exchActions != null;
         assert topVer != null;
 
-        for (DynamicCacheChangeRequest req : reqs)
-            if (req.globalStateChange()) {
-                ChangeGlobalStateContext cgsCtx = lastCgsCtx;
+        if (exchActions.newClusterState() != null) {
+            ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 
-                assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+            assert cgsCtx != null : exchActions;
 
-                cgsCtx.topologyVersion(topVer);
-
-                return true;
-            }
+            cgsCtx.topologyVersion(topVer);
 
+            return true;
+        }
 
         return false;
     }