You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/18 08:34:13 UTC

[1/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.

Repository: ignite
Updated Branches:
  refs/heads/master e84d23787 -> 201769640


http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 1023140..e423098 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughS
 import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheStartTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxNearPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest
 import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
@@ -222,6 +224,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
         suite.addTestSuite(IgniteCacheCreatePutTest.class);
         suite.addTestSuite(CacheStartOnJoinTest.class);
+        suite.addTestSuite(IgniteCacheStartTest.class);
+        suite.addTestSuite(CacheDiscoveryDataConcurrentJoinTest.class);
 
         suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
index 9389646..0fd16f0 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
@@ -77,7 +77,7 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest {
                 }
                 return null;
             }
-        }, IgniteException.class, "Failed to load bean in application context");
+        }, IgniteException.class, "Spring bean with provided name doesn't exist");
     }
 
     /**


[6/6] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20176964
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20176964
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20176964

Branch: refs/heads/master
Commit: 201769640cf4a48ecb0728d5edb0b16c682ee46b
Parents: 8600200 e84d237
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 18 11:33:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 18 11:33:54 2017 +0300

----------------------------------------------------------------------
 .../dotnet/Apache.Ignite.Core/Binary/TimestampAttribute.cs         | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[5/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.

Posted by sb...@apache.org.
Moved logic related to caches discovery data handling to ClusterCachesInfo.
Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86002002
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86002002
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86002002

Branch: refs/heads/master
Commit: 86002002fda063eb33431e8bfd3f3a0df047325b
Parents: 20f4d18
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 18 11:33:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 18 11:33:25 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   11 +-
 .../affinity/AffinityTopologyVersion.java       |    9 +
 .../cache/CacheAffinitySharedManager.java       |  286 ++--
 .../CacheClientReconnectDiscoveryData.java      |  133 ++
 .../internal/processors/cache/CacheData.java    |  157 ++
 .../cache/CacheJoinNodeDiscoveryData.java       |  147 ++
 .../cache/CacheNodeCommonDiscoveryData.java     |   82 +
 .../processors/cache/ClusterCachesInfo.java     |  903 +++++++++++
 .../cache/DynamicCacheChangeBatch.java          |   92 +-
 .../cache/DynamicCacheChangeRequest.java        |  102 +-
 .../cache/DynamicCacheDescriptor.java           |  171 +--
 .../processors/cache/ExchangeActions.java       |  338 +++++
 .../processors/cache/GridCacheContext.java      |   26 +-
 .../GridCachePartitionExchangeManager.java      |   54 +-
 .../processors/cache/GridCacheProcessor.java    | 1413 +++++-------------
 .../dht/GridClientPartitionTopology.java        |    2 +-
 .../dht/GridDhtAffinityAssignmentRequest.java   |   36 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |   36 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   63 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   12 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     |    2 +
 .../GridDhtPartitionsExchangeFuture.java        |  160 +-
 .../dht/preloader/GridDhtPreloader.java         |    4 +-
 .../processors/cache/local/GridLocalCache.java  |   31 +-
 .../cache/local/GridLocalLockFuture.java        |   41 +-
 .../cache/query/GridCacheQueryManager.java      |    3 +-
 .../continuous/CacheContinuousQueryHandler.java |   42 +-
 .../continuous/CacheContinuousQueryManager.java |   11 +-
 .../cluster/GridClusterStateProcessor.java      |   26 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |    2 +-
 .../ignite/spi/discovery/DiscoveryDataBag.java  |   18 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    4 +-
 .../core/src/test/config/examples.properties    |    1 +
 ...ityFunctionBackupFilterAbstractSelfTest.java |   13 +-
 .../internal/GridNodeMetricsLogSelfTest.java    |   13 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    |   41 +-
 ...gniteCacheP2pUnmarshallingNearErrorTest.java |    6 +-
 .../processors/cache/IgniteCacheStartTest.java  |  191 +++
 .../cache/IgniteDynamicCacheStartSelfTest.java  |    2 +-
 ...niteTopologyValidatorGridSplitCacheTest.java |    8 +-
 .../CacheDiscoveryDataConcurrentJoinTest.java   |  198 +++
 .../CacheLateAffinityAssignmentTest.java        |    4 +-
 .../cache/distributed/CacheStartOnJoinTest.java |   10 +
 .../IgniteCrossCacheTxStoreSelfTest.java        |   12 +-
 ...ePartitionedBasicStoreMultiNodeSelfTest.java |   22 +-
 .../loadtests/hashmap/GridCacheTestContext.java |    4 +
 .../testsuites/IgniteCacheTestSuite4.java       |    4 +
 .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java |    2 +-
 48 files changed, 3198 insertions(+), 1750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e5f2278..24c7283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -315,8 +315,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         boolean nearEnabled,
         CacheMode cacheMode
     ) {
-        if (!registeredCaches.containsKey(cacheName))
+        if (!registeredCaches.containsKey(cacheName)) {
+            if (cacheMode == CacheMode.REPLICATED)
+                nearEnabled = false;
+            
             registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+        }
     }
 
     /**
@@ -2737,7 +2741,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if this node is a data node for given cache.
          */
         public boolean dataNode(ClusterNode node) {
-            return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
+            return CU.affinityNode(node, cacheFilter);
         }
 
         /**
@@ -2753,9 +2757,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @return {@code True} if near cache is present on the given nodes.
          */
         public boolean nearNode(ClusterNode node) {
-            if (node.isDaemon())
-                return false;
-
             if (CU.affinityNode(node, cacheFilter))
                 return nearEnabled;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index f564e28..8669530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,15 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
     }
 
     /**
+     * @return Topology version with incremented minor version.
+     */
+    public AffinityTopologyVersion nextMinorVersion() {
+        assert topVer > 0;
+
+        return new AffinityTopologyVersion(topVer, minorTopVer + 1);
+    }
+
+    /**
      * @return Topology version.
      */
     public long topologyVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0958208..cec42a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -48,7 +49,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -90,8 +90,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final Object mux = new Object();
 
     /** Pending affinity assignment futures. */
-    private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
-        pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+        new ConcurrentHashMap8<>();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -312,50 +312,59 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(cacheId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 });
         }
     }
 
     /**
+     * @param exchActions Cache change requests to execute on exchange.
+     */
+    private void updateCachesInfo(ExchangeActions exchActions) {
+        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+            DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId());
+
+            assert desc != null : action.request().cacheName();
+        }
+
+        for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
+            DynamicCacheChangeRequest req = action.request();
+
+            Integer cacheId = action.descriptor().cacheId();
+
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
+                req.startCacheConfiguration(),
+                req.cacheType(),
+                false,
+                action.descriptor().receivedFrom(),
+                action.descriptor().staticallyConfigured(),
+                req.deploymentId(),
+                req.schema());
+
+            DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+
+            assert old == null : old;
+        }
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.
      * @param crd Coordinator flag.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if client-only exchange is needed.
      */
     public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
-        Collection<DynamicCacheChangeRequest> reqs)
-        throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : fut;
-
-        for (DynamicCacheChangeRequest req : reqs) {
-            Integer cacheId = CU.cacheId(req.cacheName());
-
-            if (req.stop()) {
-                DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
-
-                assert desc != null : cacheId;
-            }
-            else if (req.start() && !req.clientStartOnly()) {
-                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
-                    req.startCacheConfiguration(),
-                    req.cacheType(),
-                    false,
-                    req.deploymentId(),
-                    req.schema());
-
-                DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
-
-                assert old == null : old;
-            }
-        }
+        ExchangeActions exchActions)
+        throws IgniteCheckedException
+    {
+        assert exchActions != null && !exchActions.empty() : exchActions;
 
-        boolean clientOnly = true;
+        updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
         forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -367,87 +376,103 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         });
 
-        Set<Integer> stoppedCaches = null;
+        for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+            DynamicCacheDescriptor cacheDesc = action.descriptor();
 
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (!(req.clientStartOnly() || req.close()))
-                clientOnly = false;
+            DynamicCacheChangeRequest req = action.request();
 
-            Integer cacheId = CU.cacheId(req.cacheName());
+            boolean startCache;
 
-            if (req.start()) {
-                cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+            NearCacheConfiguration nearCfg = null;
 
-                if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+            if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+                startCache = true;
+
+                nearCfg = req.nearCacheConfiguration();
+            }
+            else {
+                startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
+                    CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+            }
+
+            if (startCache) {
+                cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
+
+                if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
                     if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
                         U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
                 }
+            }
 
-                if (!crd || !lateAffAssign) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            if (!crd || !lateAffAssign) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId());
 
-                    if (cacheCtx != null && !cacheCtx.isLocal()) {
-                        boolean clientCacheStarted =
-                            req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+                if (cacheCtx != null && !cacheCtx.isLocal()) {
+                    boolean clientCacheStarted =
+                        req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
 
-                        if (clientCacheStarted)
-                            initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
-                        else if (!req.clientStartOnly()) {
-                            assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                    if (clientCacheStarted)
+                        initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+                    else if (!req.clientStartOnly()) {
+                        assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
-                            GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+                        GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
 
-                            assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+                        assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
 
-                            List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
-                                fut.discoveryEvent(), fut.discoCache());
+                        List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
+                            fut.discoveryEvent(), fut.discoCache());
 
-                            aff.initialize(fut.topologyVersion(), assignment);
-                        }
+                        aff.initialize(fut.topologyVersion(), assignment);
                     }
                 }
-                else
-                    initStartedCacheOnCoordinator(fut, cacheId);
             }
-            else if (req.stop() || req.close()) {
-                cctx.cache().blockGateway(req);
+            else
+                initStartedCacheOnCoordinator(fut, cacheDesc.cacheId());
+        }
 
-                if (crd) {
-                    boolean rmvCache = false;
+        for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
+            Integer cacheId = CU.cacheId(req.cacheName());
 
-                    if (req.close() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+            cctx.cache().blockGateway(req);
 
-                        rmvCache = cacheCtx != null && !cacheCtx.affinityNode();
-                    }
-                    else if (req.stop())
-                        rmvCache = true;
+            if (crd) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                    if (rmvCache) {
-                        CacheHolder cache = caches.remove(cacheId);
+                // Client cache was stopped, need create 'client' CacheHolder.
+                if (cacheCtx != null && !cacheCtx.affinityNode()) {
+                    CacheHolder cache = caches.remove(cacheId);
 
-                        if (cache != null) {
-                            if (!req.stop()) {
-                                assert !cache.client();
+                    assert !cache.client() : cache;
 
-                                cache = CacheHolder2.create(cctx,
-                                    cctx.cache().cacheDescriptor(cacheId),
-                                    fut,
-                                    cache.affinity());
+                    cache = CacheHolder2.create(cctx,
+                        cctx.cache().cacheDescriptor(cacheId),
+                        fut,
+                        cache.affinity());
 
-                                caches.put(cacheId, cache);
-                            }
-                            else {
-                                if (stoppedCaches == null)
-                                    stoppedCaches = new HashSet<>();
+                    caches.put(cacheId, cache);
+                }
+            }
+        }
 
-                                stoppedCaches.add(cache.cacheId());
+        Set<Integer> stoppedCaches = null;
 
-                                cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
-                            }
-                        }
-                    }
-                }
+        for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+            DynamicCacheDescriptor desc = action.descriptor();
+
+            cctx.cache().blockGateway(action.request());
+
+            if (crd && lateAffAssign && desc.cacheConfiguration().getCacheMode() != LOCAL) {
+                CacheHolder cache = caches.remove(desc.cacheId());
+
+                assert cache != null : action.request();
+
+                if (stoppedCaches == null)
+                    stoppedCaches = new HashSet<>();
+
+                stoppedCaches.add(cache.cacheId());
+
+                cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class);
             }
         }
 
@@ -479,7 +504,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
-        return clientOnly;
+        return exchActions.clientOnlyExchange();
     }
 
     /**
@@ -578,7 +603,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                IgniteUuid deploymentId = registeredCaches.get(aff.cacheId()).deploymentId();
+                DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId());
+
+                assert desc != null : aff.cacheName();
+
+                IgniteUuid deploymentId = desc.deploymentId();
 
                 if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) {
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -659,7 +688,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 });
             }
             else
-                initCachesAffinity(fut);
+                initAffinityNoLateAssignment(fut);
         }
     }
 
@@ -667,7 +696,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to add.
      */
     public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
 
         assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
             ", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -677,27 +706,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Future to remove.
      */
     public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
-        boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+        boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
 
-        assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+        assert rmv : "Failed to remove assignment fetch future: " + fut.id();
     }
 
     /**
-     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(Integer cacheId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
 
-        for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
-            if (fut.key().get1().equals(cacheId)) {
-                fut.onResponse(nodeId, res);
+        GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
 
-                break;
-            }
-        }
+        if (fut != null)
+            fut.onResponse(nodeId, res);
     }
 
     /**
@@ -773,7 +798,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * Initialized affinity started on this exchange.
+     * Initialized affinity for cache received from node joining on this exchange.
      *
      * @param crd Coordinator flag.
      * @param fut Exchange future.
@@ -782,12 +807,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     public void initStartedCaches(boolean crd,
         final GridDhtPartitionsExchangeFuture fut,
-        @Nullable Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
-        if (descs != null) {
-            for (DynamicCacheDescriptor desc : descs) {
-                if (!registeredCaches.containsKey(desc.cacheId()))
-                    registeredCaches.put(desc.cacheId(), desc);
-            }
+        Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
+        for (DynamicCacheDescriptor desc : descs) {
+            if (!registeredCaches.containsKey(desc.cacheId()))
+                registeredCaches.put(desc.cacheId(), desc);
         }
 
         if (crd && lateAffAssign) {
@@ -808,28 +831,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(aff, fut, false);
+                        initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false);
                 }
             });
         }
     }
 
     /**
+     * @param desc Cache descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @param fetch Force fetch flag.
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
+    private void initAffinity(DynamicCacheDescriptor desc,
+        GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut,
+        boolean fetch)
         throws IgniteCheckedException {
-        if (!fetch && canCalculateAffinity(aff, fut)) {
+        assert desc != null;
+
+        if (!fetch && canCalculateAffinity(desc, aff, fut)) {
             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             aff.initialize(fut.topologyVersion(), assignment);
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                aff.cacheName(),
+                desc,
                 fut.topologyVersion(),
                 fut.discoCache());
 
@@ -840,11 +869,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @param desc Cache descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
      */
-    private boolean canCalculateAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) {
+    private boolean canCalculateAffinity(DynamicCacheDescriptor desc,
+        GridAffinityAssignmentCache aff,
+        GridDhtPartitionsExchangeFuture fut) {
+        assert desc != null : aff.cacheName();
+
         // Do not request affinity from remote nodes if affinity function is not centralized.
         if (!aff.centralizedAffinityFunction())
             return true;
@@ -852,13 +886,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         // If local node did not initiate exchange or local node is the only cache node in grid.
         Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
 
-        DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
-
-        assert cacheDesc != null : aff.cacheName();
-
-        return fut.cacheStarted(aff.cacheId()) ||
+        return fut.cacheAddedOnExchange(aff.cacheId(), desc.receivedFrom()) ||
             !fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
-            cctx.localNodeId().equals(cacheDesc.receivedFrom()) ||
             (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
     }
 
@@ -898,7 +927,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
         }
         else
-            initCachesAffinity(fut);
+            initAffinityNoLateAssignment(fut);
 
         synchronized (mux) {
             affCalcVer = fut.topologyVersion();
@@ -958,7 +987,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
             else {
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    cacheCtx.name(),
+                    cacheDesc,
                     topVer,
                     fut.discoCache());
 
@@ -971,7 +1000,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         for (int i = 0; i < fetchFuts.size(); i++) {
             GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
 
-            Integer cacheId = fetchFut.key().get1();
+            Integer cacheId = fetchFut.cacheId();
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }
@@ -1042,7 +1071,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             centralizedAff = true;
         }
         else {
-            initCachesAffinity(fut);
+            initAffinityNoLateAssignment(fut);
 
             centralizedAff = false;
         }
@@ -1060,14 +1089,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param fut Exchange future.
      * @throws IgniteCheckedException If failed.
      */
-    private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+    private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
         assert !lateAffAssign;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
 
-            initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+            initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false);
         }
     }
 
@@ -1099,7 +1128,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                                processAffinityAssignmentResponse(cacheId, nodeId, res);
+                                processAffinityAssignmentResponse(nodeId, res);
                             }
                         }
                     );
@@ -1125,7 +1154,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        aff.cacheName(),
+                        desc,
                         prev.topologyVersion(),
                         prev.discoCache());
 
@@ -1192,7 +1221,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(cacheId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 }
             );
@@ -1714,7 +1743,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             assert ccfg != null : cacheDesc;
             assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
 
-            assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(), fut.topologyVersion()).contains(cctx.localNode());
+            assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(),
+                fut.topologyVersion()).contains(cctx.localNode()) : cacheDesc.cacheName();
 
             AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
new file mode 100644
index 0000000..a30331f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Discovery data sent from client reconnecting to cluster.
+ */
+public class CacheClientReconnectDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Map<String, CacheInfo> clientCaches;
+
+    /**
+     * @param clientCaches Information about caches started on re-joining client node.
+     */
+    CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) {
+        this.clientCaches = clientCaches;
+    }
+
+    /**
+     * @return Information about caches started on re-joining client node.
+     */
+    Map<String, CacheInfo> clientCaches() {
+        return clientCaches;
+    }
+
+    /**
+     *
+     */
+    static class CacheInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final CacheConfiguration ccfg;
+
+        /** */
+        private final CacheType cacheType;
+
+        /** */
+        private final IgniteUuid deploymentId;
+
+        /** */
+        private final boolean nearCache;
+
+        /** Flags added for future usage. */
+        private final byte flags;
+
+        /**
+         * @param ccfg Cache configuration.
+         * @param cacheType Cache type.
+         * @param deploymentId Cache deployment ID.
+         * @param nearCache Near cache flag.
+         * @param flags Flags (for future usage).
+         */
+        CacheInfo(CacheConfiguration ccfg,
+            CacheType cacheType,
+            IgniteUuid deploymentId,
+            boolean nearCache,
+            byte flags) {
+            assert ccfg != null;
+            assert cacheType != null;
+            assert deploymentId != null;
+
+            this.ccfg = ccfg;
+            this.cacheType = cacheType;
+            this.deploymentId = deploymentId;
+            this.nearCache = nearCache;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache type.
+         */
+        CacheType cacheType() {
+            return cacheType;
+        }
+
+        /**
+         * @return Cache deployment ID.
+         */
+        IgniteUuid deploymentId() {
+            return deploymentId;
+        }
+
+        /**
+         * @return Near cache flag.
+         */
+        boolean nearCache() {
+            return nearCache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheInfo.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheClientReconnectDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
new file mode 100644
index 0000000..4768a9a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+public class CacheData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final CacheConfiguration cacheCfg;
+
+    /** */
+    private final Integer cacheId;
+
+    /** */
+    private final CacheType cacheType;
+
+    /** */
+    private final IgniteUuid deploymentId;
+
+    /** */
+    private final QuerySchema schema;
+
+    /** */
+    private final UUID rcvdFrom;
+
+    /** */
+    private final boolean staticCfg;
+
+    /** */
+    private final boolean template;
+
+    /** Flags added for future usage. */
+    private final byte flags;
+
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param cacheId Cache ID.
+     * @param cacheType Cache ID.
+     * @param deploymentId Cache deployment ID.
+     * @param schema Query schema.
+     * @param rcvdFrom Node ID cache was started from.
+     * @param staticCfg {@code True} if cache was statically configured.
+     * @param template {@code True} if this is cache template.
+     * @param flags Flags (added for future usage).
+     */
+    CacheData(CacheConfiguration cacheCfg,
+        int cacheId,
+        CacheType cacheType,
+        IgniteUuid deploymentId,
+        QuerySchema schema,
+        UUID rcvdFrom,
+        boolean staticCfg,
+        boolean template,
+        byte flags) {
+        assert cacheCfg != null;
+        assert rcvdFrom != null : cacheCfg.getName();
+        assert deploymentId != null : cacheCfg.getName();
+        assert template || cacheId != 0 : cacheCfg.getName();
+
+        this.cacheCfg = cacheCfg;
+        this.cacheId = cacheId;
+        this.cacheType = cacheType;
+        this.deploymentId = deploymentId;
+        this.schema = schema;
+        this.rcvdFrom = rcvdFrom;
+        this.staticCfg = staticCfg;
+        this.template = template;
+        this.flags = flags;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public Integer cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * @return {@code True} if this is template configuration.
+     */
+    public boolean template() {
+        return template;
+    }
+
+    /**
+     * @return Cache type.
+     */
+    public CacheType cacheType() {
+        return cacheType;
+    }
+
+    /**
+     * @return Start ID.
+     */
+    public IgniteUuid deploymentId() {
+        return deploymentId;
+    }
+
+    /**
+     * @return {@code True} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration cacheConfiguration() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Schema.
+     */
+    public QuerySchema schema() {
+        return schema.copy();
+    }
+
+    /**
+     * @return ID of node provided cache configuration.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..ea24140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Information about configured caches sent from joining node.
+ */
+class CacheJoinNodeDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheInfo> caches;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheInfo> templates;
+
+    /** */
+    @GridToStringInclude
+    private final IgniteUuid cacheDeploymentId;
+
+    /** */
+    private final boolean startCaches;
+
+    /**
+     * @param cacheDeploymentId Deployment ID for started caches.
+     * @param caches Caches.
+     * @param templates Templates.
+     * @param startCaches {@code True} if required to start all caches on joining node.
+     */
+    CacheJoinNodeDiscoveryData(
+        IgniteUuid cacheDeploymentId,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
+        boolean startCaches) {
+        this.cacheDeploymentId = cacheDeploymentId;
+        this.caches = caches;
+        this.templates = templates;
+        this.startCaches = startCaches;
+    }
+
+    /**
+     * @return {@code True} if required to start all caches on joining node.
+     */
+    boolean startCaches() {
+        return startCaches;
+    }
+
+    /**
+     * @return Deployment ID assigned on joining node.
+     */
+    IgniteUuid cacheDeploymentId() {
+        return cacheDeploymentId;
+    }
+
+    /**
+     * @return Templates configured on joining node.
+     */
+    Map<String, CacheInfo> templates() {
+        return templates;
+    }
+
+    /**
+     * @return Caches configured on joining node.
+     */
+    Map<String, CacheInfo> caches() {
+        return caches;
+    }
+
+    /**
+     *
+     */
+    static class CacheInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @GridToStringInclude
+        private final CacheConfiguration ccfg;
+
+        /** */
+        @GridToStringInclude
+        private final CacheType cacheType;
+
+        /** Flags added for future usage. */
+        private final byte flags;
+
+        /**
+         * @param ccfg Cache configuration.
+         * @param cacheType Cache type.
+         * @param flags Flags (for future usage).
+         */
+        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) {
+            this.ccfg = ccfg;
+            this.cacheType = cacheType;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache type.
+         */
+        CacheType cacheType() {
+            return cacheType;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CacheInfo.class, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheJoinNodeDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
new file mode 100644
index 0000000..84a33dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+class CacheNodeCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheData> caches;
+
+    /** */
+    @GridToStringInclude
+    private final Map<String, CacheData> templates;
+
+    /** */
+    private final Map<String, Map<UUID, Boolean>> clientNodesMap;
+
+    /**
+     * @param caches Started caches.
+     * @param templates Configured templates.
+     * @param clientNodesMap Information about cache client nodes.
+     */
+    CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
+        Map<String, CacheData> templates,
+        Map<String, Map<UUID, Boolean>> clientNodesMap) {
+        this.caches = caches;
+        this.templates = templates;
+        this.clientNodesMap = clientNodesMap;
+    }
+
+    /**
+     * @return Started caches.
+     */
+    Map<String, CacheData> caches() {
+        return caches;
+    }
+
+    /**
+     * @return Configured templates.
+     */
+    Map<String, CacheData> templates() {
+        return templates;
+    }
+
+    /**
+     * @return Information about cache client nodes.
+     */
+    Map<String, Map<UUID, Boolean>> clientNodesMap() {
+        return clientNodesMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheNodeCommonDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
new file mode 100644
index 0000000..28ec600
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
+
+/**
+ * Logic related to cache discovery date processing.
+ */
+class ClusterCachesInfo {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** Dynamic caches. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+    /** Cache templates. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+    /** */
+    private CacheJoinNodeDiscoveryData joinDiscoData;
+
+    /** */
+    private CacheNodeCommonDiscoveryData gridData;
+
+    /** */
+    private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+
+    /** */
+    private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+
+    /**
+     * @param ctx Context.
+     */
+    ClusterCachesInfo(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+    }
+
+    /**
+     * @param joinDiscoData Information about configured caches and templates.
+     */
+    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+        this.joinDiscoData = joinDiscoData;
+
+        processJoiningNode(joinDiscoData, ctx.localNodeId());
+    }
+
+    /**
+     * @param checkConsistency {@code True} if need check cache configurations consistency.
+     * @throws IgniteCheckedException If failed.
+     */
+    void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+        if (checkConsistency && joinDiscoData != null && gridData != null) {
+            for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
+                CacheConfiguration locCfg = locCacheInfo.config();
+
+                CacheData cacheData = gridData.caches().get(locCfg.getName());
+
+                if (cacheData != null)
+                    checkCache(locCfg, cacheData.cacheConfiguration(), cacheData.receivedFrom());
+            }
+        }
+
+        joinDiscoData = null;
+        gridData = null;
+    }
+    /**
+     * Checks that remote caches has configuration compatible with the local.
+     *
+     * @param locCfg Local configuration.
+     * @param rmtCfg Remote configuration.
+     * @param rmt Remote node.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private void checkCache(CacheConfiguration<?, ?> locCfg, CacheConfiguration<?, ?> rmtCfg, UUID rmt)
+        throws IgniteCheckedException {
+        GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
+        GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
+
+        CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
+            locAttr.cacheMode(), rmtAttr.cacheMode(), true);
+
+        if (rmtAttr.cacheMode() != LOCAL) {
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
+                locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
+                "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
+                "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
+
+            ClusterNode rmtNode = ctx.discovery().node(rmt);
+
+            if (CU.affinityNode(ctx.discovery().localNode(), locCfg.getNodeFilter())
+                && rmtNode != null && CU.affinityNode(rmtNode, rmtCfg.getNodeFilter())) {
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
+                    locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
+            }
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
+                    locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
+                "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
+                rmtAttr.cacheAffinityMapperClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
+                "Affinity partitions count", locAttr.affinityPartitionsCount(),
+                rmtAttr.affinityPartitionsCount(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
+                locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
+                locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
+                "Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
+                rmtAttr.transactionManagerLookupClassName(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
+                "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
+                "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
+                "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
+                true);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
+                "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
+                false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
+                "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
+                "Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
+                rmtAttr.writeBehindFlushFrequency(), false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
+                "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
+                false);
+
+            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
+                "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
+                rmtAttr.writeBehindFlushThreadCount(), false);
+
+            if (locAttr.cacheMode() == PARTITIONED) {
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
+                    "Near eviction policy", locAttr.nearEvictionPolicyClassName(),
+                    rmtAttr.nearEvictionPolicyClassName(), false);
+
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
+                    "Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
+                    rmtAttr.affinityIncludeNeighbors(), true);
+
+                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
+                    "Affinity key backups", locAttr.affinityKeyBackups(),
+                    rmtAttr.affinityKeyBackups(), true);
+            }
+        }
+    }
+
+    /**
+     * @param batch Cache change request.
+     * @param topVer Topology version.
+     * @return {@code True} if minor topology version should be increased.
+     */
+    boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+        ExchangeActions exchangeActions = new ExchangeActions();
+
+        boolean incMinorTopVer = false;
+
+        List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+        final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
+
+        for (DynamicCacheChangeRequest req : batch.requests()) {
+            if (req.template()) {
+                CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                assert ccfg != null : req;
+
+                DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
+
+                if (desc == null) {
+                    DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+                        ccfg,
+                        req.cacheType(),
+                        true,
+                        req.initiatingNodeId(),
+                        false,
+                        req.deploymentId(),
+                        req.schema());
+
+                    DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+
+                    assert old == null;
+
+                    addedDescs.add(templateDesc);
+                }
+
+                ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+
+                continue;
+            }
+
+            DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
+
+            boolean needExchange = false;
+
+            AffinityTopologyVersion waitTopVer = null;
+
+            if (req.start()) {
+                if (desc == null) {
+                    if (req.clientStartOnly()) {
+                        ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
+                            "client cache (a cache with the given name is not started): " + req.cacheName()));
+                    }
+                    else {
+                        CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
+
+                        assert req.cacheType() != null : req;
+                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
+
+                        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+                            ccfg,
+                            req.cacheType(),
+                            false,
+                            req.initiatingNodeId(),
+                            false,
+                            req.deploymentId(),
+                            req.schema());
+
+                        DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+                        assert old == null;
+
+                        ctx.discovery().setCacheFilter(
+                            ccfg.getName(),
+                            ccfg.getNodeFilter(),
+                            ccfg.getNearConfiguration() != null,
+                            ccfg.getCacheMode());
+
+                        ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+
+                        addedDescs.add(startDesc);
+
+                        exchangeActions.addCacheToStart(req, startDesc);
+
+                        needExchange = true;
+                    }
+                }
+                else {
+                    assert req.initiatingNodeId() != null : req;
+
+                    // Cache already exists, exchange is needed only if client cache should be created.
+                    ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+                    boolean clientReq = node != null &&
+                        !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+                    if (req.clientStartOnly()) {
+                        needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+                    }
+                    else {
+                        if (req.failIfExists()) {
+                            ctx.cache().completeCacheStartFuture(req,
+                                new CacheExistsException("Failed to start cache " +
+                                    "(a cache with the same name is already started): " + req.cacheName()));
+                        }
+                        else {
+                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                                req.initiatingNodeId(),
+                                req.nearCacheConfiguration() != null);
+                        }
+                    }
+
+                    if (needExchange) {
+                        req.clientStartOnly(true);
+
+                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
+
+                        exchangeActions.addClientCacheToStart(req, desc);
+                    }
+                }
+
+                if (!needExchange && desc != null) {
+                    if (desc.clientCacheStartVersion() != null)
+                        waitTopVer = desc.clientCacheStartVersion();
+                    else
+                        waitTopVer = desc.startTopologyVersion();
+                }
+            }
+            else if (req.globalStateChange())
+                exchangeActions.newClusterState(req.state());
+            else if (req.resetLostPartitions()) {
+                if (desc != null) {
+                    needExchange = true;
+
+                    exchangeActions.addCacheToResetLostPartitions(req, desc);
+                }
+            }
+            else if (req.stop()) {
+                assert req.stop() ^ req.close() : req;
+
+                if (desc != null) {
+                    DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
+
+                    assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+
+                    ctx.discovery().removeCacheFilter(req.cacheName());
+
+                    needExchange = true;
+
+                    exchangeActions.addCacheToStop(req, desc);
+                }
+            }
+            else if (req.close()) {
+                if (desc != null) {
+                    needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+
+                    if (needExchange)
+                        exchangeActions.addCacheToClose(req, desc);
+                }
+            }
+            else
+                assert false : req;
+
+            if (!needExchange) {
+                if (req.initiatingNodeId().equals(ctx.localNodeId()))
+                    reqsToComplete.add(new T2<>(req, waitTopVer));
+            }
+            else
+                incMinorTopVer = true;
+        }
+
+        if (!F.isEmpty(addedDescs)) {
+            AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+
+            for (DynamicCacheDescriptor desc : addedDescs) {
+                assert desc.template() || incMinorTopVer;
+
+                desc.startTopologyVersion(startTopVer);
+            }
+        }
+
+        if (!F.isEmpty(reqsToComplete)) {
+            ctx.closure().callLocalSafe(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
+                        final DynamicCacheChangeRequest req = t.get1();
+                        AffinityTopologyVersion waitTopVer = t.get2();
+
+                        IgniteInternalFuture<?> fut = waitTopVer != null ?
+                            ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+
+                        if (fut == null || fut.isDone())
+                            ctx.cache().completeCacheStartFuture(req, null);
+                        else {
+                            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    ctx.cache().completeCacheStartFuture(req, null);
+                                }
+                            });
+                        }
+                    }
+
+                    return null;
+                }
+            });
+        }
+
+        if (incMinorTopVer) {
+            assert !exchangeActions.empty() : exchangeActions;
+
+            batch.exchangeActions(exchangeActions);
+        }
+
+        return incMinorTopVer;
+    }
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (!ctx.isDaemon())
+            dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+    }
+
+    /**
+     * @return Discovery date sent on local node join.
+     */
+    private Serializable joinDiscoveryData() {
+        if (cachesOnDisconnect != null) {
+            Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
+
+            for (IgniteInternalCache cache : ctx.cache().caches()) {
+                DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+
+                assert desc != null : cache.name();
+
+                cachesInfo.put(cache.name(), new CacheClientReconnectDiscoveryData.CacheInfo(desc.cacheConfiguration(),
+                    desc.cacheType(),
+                    desc.deploymentId(),
+                    cache.context().isNear(),
+                    (byte)0));
+            }
+
+            return new CacheClientReconnectDiscoveryData(cachesInfo);
+        }
+        else {
+            assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
+
+            return joinDiscoData;
+        }
+    }
+
+    /**
+     * Called from exchange worker.
+     *
+     * @return Caches to be started when this node starts.
+     */
+    List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+        if (ctx.isDaemon())
+            return Collections.emptyList();
+
+        assert locJoinStartCaches != null;
+
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
+
+        this.locJoinStartCaches = null;
+
+        return locJoinStartCaches;
+    }
+
+    /**
+     * @param joinedNodeId Joined node ID.
+     * @return New caches received from joined node.
+     */
+    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+        assert joinedNodeId != null;
+
+        List<DynamicCacheDescriptor> started = null;
+
+        if (!ctx.isDaemon()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (desc.staticallyConfigured()) {
+                    assert desc.receivedFrom() != null : desc;
+
+                    if (joinedNodeId.equals(desc.receivedFrom())) {
+                        if (started == null)
+                            started = new ArrayList<>();
+
+                        started.add(desc);
+                    }
+                }
+            }
+        }
+
+        return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
+    }
+
+    /**
+     * Discovery event callback, executed from discovery thread.
+     *
+     * @param type Event type.
+     * @param node Event node.
+     * @param topVer Topology version.
+     */
+    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+        if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.receivedFromStartVersion(topVer);
+            }
+
+            for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+                if (node.id().equals(desc.receivedFrom()))
+                    desc.receivedFromStartVersion(topVer);
+            }
+
+            if (node.id().equals(ctx.discovery().localNode().id())) {
+                if (gridData == null) { // First node starts.
+                    assert joinDiscoData != null || !ctx.state().active();
+
+                    initStartCachesForLocalJoin(true);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.isDaemon())
+            return;
+
+        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+            dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+    }
+
+    /**
+     * @return Information about started caches.
+     */
+    private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+        Map<String, CacheData> caches = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                desc.cacheId(),
+                desc.cacheType(),
+                desc.deploymentId(),
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                false,
+                (byte)0);
+
+            caches.put(desc.cacheName(), cacheData);
+        }
+
+        Map<String, CacheData> templates = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                0,
+                desc.cacheType(),
+                desc.deploymentId(),
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                true,
+                (byte)0);
+
+            templates.put(desc.cacheName(), cacheData);
+        }
+
+        return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+    }
+
+    /**
+     * @param data Discovery data.
+     */
+    void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        if (ctx.isDaemon() || data.commonData() == null)
+            return;
+
+        assert joinDiscoData != null || disconnectedState() || !ctx.state().active();
+        assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
+
+        CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+
+        for (CacheData cacheData : cachesData.templates().values()) {
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                true,
+                cacheData.receivedFrom(),
+                cacheData.staticallyConfigured(),
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+        }
+
+        for (CacheData cacheData : cachesData.caches().values()) {
+            CacheConfiguration<?, ?> cfg = cacheData.cacheConfiguration();
+
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                false,
+                cacheData.receivedFrom(),
+                cacheData.staticallyConfigured(),
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            desc.receivedOnDiscovery(true);
+
+            registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
+
+            ctx.discovery().setCacheFilter(
+                cfg.getName(),
+                cfg.getNodeFilter(),
+                cfg.getNearConfiguration() != null,
+                cfg.getCacheMode());
+        }
+
+        if (!F.isEmpty(cachesData.clientNodesMap())) {
+            for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+                String cacheName = entry.getKey();
+
+                for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+                    ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+            }
+        }
+
+        gridData = cachesData;
+
+        if (!disconnectedState())
+            initStartCachesForLocalJoin(false);
+        else
+            locJoinStartCaches = Collections.emptyList();
+    }
+
+    /**
+     * @param firstNode {@code True} if first node in cluster starts.
+     */
+    private void initStartCachesForLocalJoin(boolean firstNode) {
+        assert locJoinStartCaches == null;
+
+        locJoinStartCaches = new ArrayList<>();
+
+        if (joinDiscoData != null) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
+                    continue;
+
+                CacheConfiguration<?, ?> cfg = desc.cacheConfiguration();
+
+                CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+
+                NearCacheConfiguration nearCfg = null;
+
+                if (locCfg != null) {
+                    nearCfg = locCfg.config().getNearConfiguration();
+
+                    DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
+                            locCfg.config(),
+                            desc.cacheType(),
+                            desc.template(),
+                            desc.receivedFrom(),
+                            desc.staticallyConfigured(),
+                            desc.deploymentId(),
+                            desc.schema());
+
+                    desc0.startTopologyVersion(desc.startTopologyVersion());
+                    desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
+                    desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+
+                    desc = desc0;
+                }
+
+                if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+                    locJoinStartCaches.add(new T2<>(desc, nearCfg));
+            }
+        }
+    }
+
+    /**
+     * @param data Joining node data.
+     */
+    void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Serializable joiningNodeData = data.joiningNodeData();
+
+            if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) {
+                if (disconnectedState()) {
+                    if (clientReconnectReqs == null)
+                        clientReconnectReqs = new LinkedHashMap<>();
+
+                    clientReconnectReqs.put(data.joiningNodeId(), (CacheClientReconnectDiscoveryData)joiningNodeData);
+                }
+                else
+                    processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
+            }
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+        }
+    }
+
+    /**
+     * @param clientData Discovery data.
+     * @param clientNodeId Client node ID.
+     */
+    private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
+        for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
+            String cacheName = cacheInfo.config().getName();
+
+            if (surviveReconnect(cacheName))
+                ctx.discovery().addClientNode(cacheName, clientNodeId, false);
+            else {
+                DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
+
+                if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
+                    ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+            }
+        }
+    }
+
+    /**
+     * @param joinData Joined node discovery data.
+     * @param nodeId Joined node ID.
+     */
+    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+            CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+            if (!registeredTemplates.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    true,
+                    nodeId,
+                    true,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+
+                assert old == null : old;
+            }
+        }
+
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+            CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+            if (!registeredCaches.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    false,
+                    nodeId,
+                    true,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+
+                assert old == null : old;
+
+                ctx.discovery().setCacheFilter(
+                    cfg.getName(),
+                    cfg.getNodeFilter(),
+                    cfg.getNearConfiguration() != null,
+                    cfg.getCacheMode());
+            }
+
+            ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
+        }
+
+        if (joinData.startCaches()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                ctx.discovery().addClientNode(desc.cacheName(),
+                    nodeId,
+                    desc.cacheConfiguration().getNearConfiguration() != null);
+            }
+        }
+    }
+
+    /**
+     * @return Registered caches.
+     */
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
+        return registeredCaches;
+    }
+
+    /**
+     * @return Registered cache templates.
+     */
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
+        return registeredTemplates;
+    }
+
+    /**
+     *
+     */
+    void onDisconnect() {
+        cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+        registeredCaches.clear();
+        registeredTemplates.clear();
+
+        clientReconnectReqs = null;
+    }
+
+    /**
+     * @return Stopped caches names.
+     */
+    Set<String> onReconnected() {
+        assert disconnectedState();
+
+        Set<String> stoppedCaches = new HashSet<>();
+
+        for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+            DynamicCacheDescriptor desc = e.getValue();
+
+            String cacheName = e.getKey();
+
+            boolean stopped;
+
+            if (!surviveReconnect(cacheName)) {
+                DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+
+                stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+            }
+            else
+                stopped = false;
+
+            if (stopped)
+                stoppedCaches.add(cacheName);
+        }
+
+        if (clientReconnectReqs != null) {
+            for (Map.Entry<UUID, CacheClientReconnectDiscoveryData> e : clientReconnectReqs.entrySet())
+                processClientReconnectData(e.getValue(), e.getKey());
+
+            clientReconnectReqs = null;
+        }
+
+        cachesOnDisconnect = null;
+
+        return stoppedCaches;
+    }
+
+    /**
+     * @return {@code True} if client node is currently in disconnected state.
+     */
+    private boolean disconnectedState() {
+        return cachesOnDisconnect != null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return {@code True} if cache with given name if system cache which should always survive client node disconnect.
+     */
+    private boolean surviveReconnect(String cacheName) {
+        return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
+    }
+
+    /**
+     *
+     */
+    void clearCaches() {
+        registeredCaches.clear();
+    }
+}


[3/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 87aaee0..06ad62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,7 +25,6 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
@@ -61,13 +59,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.QuerySchema;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
-import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
-import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -110,14 +101,23 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -153,7 +153,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
 import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
 import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
@@ -193,11 +192,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Template configuration add futures. */
     private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
 
-    /** Dynamic caches. */
-    private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
-
-    /** Cache templates. */
-    private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+    /** */
+    private ClusterCachesInfo cachesInfo;
 
     /** */
     private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
@@ -208,12 +204,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Count down latch for caches. */
     private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
 
-    /** */
-    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
-    /** */
-    private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
-
     /** Internal cache names. */
     private final Set<String> internalCaches;
 
@@ -391,16 +381,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cc Cache Configuration.
      * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
      */
-    private boolean storesLocallyOnClient(IgniteConfiguration c,
-                                          CacheConfiguration cc) {
+    private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
         if (c.isClientMode() && c.getMemoryConfiguration() == null) {
             if (cc.getCacheMode() == LOCAL)
                 return true;
 
-            if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
-                return true;
+            return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
 
-            return false;
         }
         else
             return false;
@@ -627,6 +614,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+        cachesInfo = new ClusterCachesInfo(ctx);
+
         DeploymentMode depMode = ctx.config().getDeploymentMode();
 
         if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
@@ -647,72 +636,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        //if inActivate on start then skip registrate caches
-        if (!activeOnStart)
-            return;
-
-        CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
-
-        registerCacheFromConfig(cfgs);
+        if (activeOnStart && !ctx.config().isDaemon()) {
+            Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
 
-        registerCacheFromPersistentStore(cfgs);
-
-        if (log.isDebugEnabled())
-            log.debug("Started cache processor.");
-    }
+            Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
 
-    /**
-     * @param cfgs Cache configurations.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException {
-        for (int i = 0; i < cfgs.length; i++) {
-            if (ctx.config().isDaemon())
-                continue;
+            addCacheOnJoinFromConfig(caches, templates);
 
-            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
+            addCacheOnJoinFromPersistentStore(caches, templates);
 
-            cfgs[i] = cfg; // Replace original configuration value.
+            CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(),
+                caches,
+                templates,
+                startAllCachesOnClientStart());
 
-            registerCache(cfg);
+            cachesInfo.onStart(discoData);
         }
-    }
-
-    /**
-     * @param cfgs Cache configurations.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException {
-        if (sharedCtx.pageStore() != null &&
-            sharedCtx.database().persistenceEnabled() &&
-            !ctx.config().isDaemon()) {
-
-            Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
-
-            for (CacheConfiguration cfg : cfgs)
-                savedCacheNames.remove(cfg.getName());
-
-            for (String name : internalCaches)
-                savedCacheNames.remove(name);
-
-            if (!F.isEmpty(savedCacheNames)) {
-                log.info("Registrate persistent caches: " + savedCacheNames);
-
-                for (String name : savedCacheNames) {
-                    CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
-
-                    if (cfg != null)
-                        registerCache(cfg);
-                }
-            }
+        else {
+            cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(),
+                Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(),
+                Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(),
+                false));
         }
+
+        if (log.isDebugEnabled())
+            log.debug("Started cache processor.");
     }
 
     /**
      * @param cfg Cache configuration.
+     * @param caches Caches map.
+     * @param templates Templates map.
      * @throws IgniteCheckedException If failed.
      */
-    private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+    private void addCacheOnJoin(CacheConfiguration cfg,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
         CU.validateCacheName(cfg.getName());
 
         cloneCheckSerializable(cfg);
@@ -722,75 +681,88 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         // Initialize defaults.
         initialize(cfg, cacheObjCtx);
 
-        String cacheName = cfg.getName();
+        boolean template = cfg.getName().endsWith("*");
 
-        if (cacheDescriptor(cfg.getName()) != null) {
-            throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
-                "assign unique name to each cache): " + cacheName);
-        }
+        if (!template) {
+            if (caches.containsKey(cfg.getName())) {
+                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+                    "assign unique name to each cache): " + cfg.getName());
+            }
 
-        CacheType cacheType;
+            CacheType cacheType;
 
-        if (CU.isUtilityCache(cfg.getName()))
+            if (CU.isUtilityCache(cfg.getName()))
                 cacheType = CacheType.UTILITY;
             else if (internalCaches.contains(cfg.getName()))
                 cacheType = CacheType.INTERNAL;
             else
                 cacheType = CacheType.USER;
 
-        if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
-            cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
+            if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+                cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
 
-        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
+            if (!cacheType.userCache())
+                stopSeq.addLast(cfg.getName());
+            else
+                stopSeq.addFirst(cfg.getName());
 
-        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
-            cfg,
-            cacheType,
-            template,
-            IgniteUuid.randomUuid(),
-            new QuerySchema(cfg.getQueryEntities()));
+            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0));
+        }
+        else
+            templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0));
+    }
 
-        desc.locallyConfigured(true);
-        desc.staticallyConfigured(true);
-        desc.receivedFrom(ctx.localNodeId());
+    /**
+     * @param caches Caches map.
+     * @param templates Templates map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void addCacheOnJoinFromConfig(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+    ) throws IgniteCheckedException {
+        assert !ctx.config().isDaemon();
 
-        if (!template) {
-            cacheDescriptor(cfg.getName(), desc);
+        CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-            ctx.discovery().setCacheFilter(
-                cfg.getName(),
-                cfg.getNodeFilter(),
-                cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
-                cfg.getCacheMode());
+        for (int i = 0; i < cfgs.length; i++) {
+            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
 
-            ctx.discovery().addClientNode(cfg.getName(),
-                ctx.localNodeId(),
-                cfg.getNearConfiguration() != null);
+            cfgs[i] = cfg; // Replace original configuration value.
 
-            if (!cacheType.userCache())
-                stopSeq.addLast(cfg.getName());
-            else
-                stopSeq.addFirst(cfg.getName());
+            addCacheOnJoin(cfg, caches, templates);
         }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Use cache configuration as template: " + cfg);
+    }
 
-            registeredTemplates.put(cacheName, desc);
-        }
+    /**
+     * @param caches Caches map.
+     * @param templates Templates map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void addCacheOnJoinFromPersistentStore(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+    ) throws IgniteCheckedException {
+        assert !ctx.config().isDaemon();
 
-        if (cfg.getName() == null) { // Use cache configuration with null name as template.
-            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
-                cfg,
-                cacheType,
-                true,
-                IgniteUuid.randomUuid(),
-                new QuerySchema(cfg.getQueryEntities()));
+        if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
+            Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
+
+            savedCacheNames.removeAll(caches.keySet());
+
+            savedCacheNames.removeAll(internalCaches);
 
-            desc0.locallyConfigured(true);
-            desc0.staticallyConfigured(true);
+            if (!F.isEmpty(savedCacheNames)) {
+                if (log.isInfoEnabled())
+                    log.info("Register persistent caches: " + savedCacheNames);
+
+                for (String name : savedCacheNames) {
+                    CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
 
-            registeredTemplates.put(cacheName, desc0);
+                    if (cfg != null)
+                        addCacheOnJoin(cfg, caches, templates);
+                }
+            }
         }
     }
 
@@ -819,7 +791,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         ClusterNode locNode = ctx.discovery().localNode();
 
         try {
-            checkConsistency();
+            boolean checkConsistency =
+                !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
+
+            if (checkConsistency)
+                checkConsistency();
+
+            cachesInfo.onKernalStart(checkConsistency);
 
             boolean currStatus = ctx.state().active();
 
@@ -865,88 +843,47 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ctx.query().onCacheKernalStart();
 
-            // Start dynamic caches received from collect discovery data.
-            for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-                if (ctx.config().isDaemon())
-                    continue;
-
-                desc.clearRemoteConfigurations();
-
-                CacheConfiguration ccfg = desc.cacheConfiguration();
-
-                IgnitePredicate filter = ccfg.getNodeFilter();
-
-                boolean loc = desc.locallyConfigured();
-
-                if (loc || (desc.receivedOnDiscovery() &&
-                    (startAllCachesOnClientStart() || CU.affinityNode(locNode, filter)))) {
-                    boolean started = desc.onStart();
-
-                    assert started : "Failed to change started flag for locally configured cache: " + desc;
-
-                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-
-                    CachePluginManager pluginMgr = desc.pluginManager();
-
-                    GridCacheContext ctx = createCache(
-                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-
-                    ctx.dynamicDeploymentId(desc.deploymentId());
-
-                    sharedCtx.addCacheContext(ctx);
-
-                    GridCacheAdapter cache = ctx.cache();
-
-                    String name = ccfg.getName();
-
-                    caches.put(name, cache);
-
-                    startCache(cache, desc.schema());
-
-                    jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
-                }
+            for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
+                if (sharedCtx.database() != mgr)
+                    mgr.onKernalStart(false);
             }
         }
         finally {
             cacheStartedLatch.countDown();
         }
 
-        // Must call onKernalStart on shared managers after creation of fetched caches.
-        for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
-            if (sharedCtx.database() != mgr)
-                mgr.onKernalStart(false);
-
         // Escape if start active on start false
         if (!activeOnStart)
             return;
 
-        for (GridCacheAdapter<?, ?> cache : caches.values())
-            onKernalStart(cache);
-
         if (!ctx.config().isDaemon())
             ctx.cacheObjects().onUtilityCacheStarted();
 
         ctx.service().onUtilityCacheStarted();
 
-        // Wait for caches in SYNC preload mode.
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            CacheConfiguration cfg = desc.cacheConfiguration();
+        final AffinityTopologyVersion startTopVer =
+            new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0);
 
-            IgnitePredicate filter = cfg.getNodeFilter();
+        final List<IgniteInternalFuture> syncFuts = new ArrayList<>(caches.size());
 
-            if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
-                GridCacheAdapter cache = caches.get(cfg.getName());
+        sharedCtx.forAllCaches(new CIX1<GridCacheContext>() {
+            @Override public void applyx(GridCacheContext cctx) throws IgniteCheckedException {
+                CacheConfiguration cfg = cctx.config();
 
-                if (cache != null) {
-                    if (cfg.getRebalanceMode() == SYNC) {
-                        CacheMode cacheMode = cfg.getCacheMode();
+                if (cctx.affinityNode() &&
+                    cfg.getRebalanceMode() == SYNC &&
+                    startTopVer.equals(cctx.startTopologyVersion())) {
+                    CacheMode cacheMode = cfg.getCacheMode();
 
-                        if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
-                            cache.preloader().syncFuture().get();
-                    }
+                    if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+                        // Need to wait outside to avoid a deadlock
+                        syncFuts.add(cctx.preloader().syncFuture());
                 }
             }
-        }
+        });
+
+        for (int i = 0, size = syncFuts.size(); i < size; i++)
+            syncFuts.get(i).get();
 
         assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
 
@@ -962,38 +899,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     *
+     * @throws IgniteCheckedException if check failed.
      */
     private void checkConsistency() throws IgniteCheckedException {
-        if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
-                    continue;
-
-                checkTransactionConfiguration(n);
-
-                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
-
-                CU.checkAttributeMismatch(
-                    log, null, n.id(), "deploymentMode", "Deployment mode",
-                    locDepMode, rmtDepMode, true);
-
-                for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
-
-                    if (rmtCfg != null) {
-                        CacheConfiguration locCfg = desc.cacheConfiguration();
+        for (ClusterNode n : ctx.discovery().remoteNodes()) {
+            if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
+                continue;
 
-                        checkCache(locCfg, rmtCfg, n);
+            checkTransactionConfiguration(n);
 
-                        // Check plugin cache configurations.
-                        CachePluginManager pluginMgr = desc.pluginManager();
+            DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+            DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                        pluginMgr.validateRemotes(rmtCfg, n);
-                    }
-                }
-            }
+            CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+                locDepMode, rmtDepMode, true);
         }
     }
 
@@ -1034,7 +953,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 stopCache(cache, cancel, false);
         }
 
-        registeredCaches.clear();
+        cachesInfo.clearCaches();
     }
 
     /**
@@ -1105,8 +1024,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
-        cachesOnDisconnect = new HashMap<>(registeredCaches);
-
         IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
             ctx.cluster().clientReconnectFuture(),
             "Failed to execute dynamic cache change request, client node disconnected.");
@@ -1133,9 +1050,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         sharedCtx.onDisconnected(reconnectFut);
 
-        registeredCaches.clear();
-
-        registeredTemplates.clear();
+        cachesInfo.onDisconnect();
     }
 
     /** {@inheritDoc} */
@@ -1144,24 +1059,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture<?, ?> stopFut = null;
 
-        for (final GridCacheAdapter cache : caches.values()) {
-            String name = cache.name();
-
-            boolean stopped;
-
-            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+        Set<String> stoppedCaches = cachesInfo.onReconnected();
 
-            if (!sysCache) {
-                DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(name);
-
-                assert oldDesc != null : "No descriptor for cache: " + name;
-
-                DynamicCacheDescriptor newDesc = cacheDescriptor(name);
-
-                stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId());
-            }
-            else
-                stopped = false;
+        for (final GridCacheAdapter cache : caches.values()) {
+            boolean stopped = stoppedCaches.contains(cache.name());
 
             if (stopped) {
                 cache.context().gate().reconnected(true);
@@ -1188,11 +1089,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 reconnected.add(cache);
 
-                if (!sysCache) {
+                if (cache.context().userCache()) {
                     // Re-create cache structures inside indexing in order to apply recent schema changes.
                     GridCacheContext cctx = cache.context();
 
-                    DynamicCacheDescriptor desc = cacheDescriptor(name);
+                    DynamicCacheDescriptor desc = cacheDescriptor(cctx.name());
 
                     assert desc != null;
 
@@ -1202,20 +1103,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
-        if (clientReconnectReqs != null) {
-            for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
-                processClientReconnectData(e.getKey(), e.getValue());
-
-            clientReconnectReqs = null;
-        }
-
         sharedCtx.onReconnected();
 
         for (GridCacheAdapter cache : reconnected)
             cache.context().gate().reconnected(false);
 
-        cachesOnDisconnect = null;
-
         if (stopFut != null)
             stopFut.markInitialized();
 
@@ -1438,16 +1330,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cfg Cache configuration to use to create cache.
      * @param pluginMgr Cache plugin manager.
-     * @param cacheType Cache type.
+     * @param desc Cache descriptor.
+     * @param locStartTopVer Current topology version.
      * @param cacheObjCtx Cache object context.
+     * @param affNode {@code True} if local node affinity node.
      * @param updatesAllowed Updates allowed flag.
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
-        CacheType cacheType,
+        DynamicCacheDescriptor desc,
+        AffinityTopologyVersion locStartTopVer,
         CacheObjectContext cacheObjCtx,
+        boolean affNode,
         boolean updatesAllowed)
         throws IgniteCheckedException {
         assert cfg != null;
@@ -1465,7 +1361,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         QueryUtils.prepareCacheConfiguration(cfg);
 
-        validate(ctx.config(), cfg, cacheType, cfgStore);
+        validate(ctx.config(), cfg, desc.cacheType(), cfgStore);
 
         if (pluginMgr == null)
             pluginMgr = new CachePluginManager(ctx, cfg);
@@ -1475,7 +1371,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         sharedCtx.jta().registerCache(cfg);
 
         // Skip suggestions for internal caches.
-        if (cacheType.userCache())
+        if (desc.cacheType().userCache())
             suggestOptimizations(cfg, cfgStore != null);
 
         Collection<Object> toPrepare = new ArrayList<>();
@@ -1508,8 +1404,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         storeMgr.initialize(cfgStore, sesHolders);
 
-        boolean affNode = cfg.getCacheMode() == LOCAL || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
-
         String memPlcName = cfg.getMemoryPolicyName();
 
         MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName);
@@ -1520,7 +1414,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ctx,
             sharedCtx,
             cfg,
-            cacheType,
+            desc.cacheType(),
+            locStartTopVer,
+            desc.receivedFrom(),
             affNode,
             updatesAllowed,
             memPlc,
@@ -1651,7 +1547,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 ctx,
                 sharedCtx,
                 cfg,
-                cacheType,
+                desc.cacheType(),
+                locStartTopVer,
+                desc.receivedFrom(),
                 affNode,
                 true,
                 memPlc,
@@ -1738,17 +1636,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Collection of started cache names.
      */
     public Collection<String> cacheNames() {
-        return F.viewReadOnly(cacheDescriptors(),
-            new IgniteClosure<DynamicCacheDescriptor, String>() {
-                @Override public String apply(DynamicCacheDescriptor desc) {
-                    return desc.cacheConfiguration().getName();
-                }
-            },
-            new IgnitePredicate<DynamicCacheDescriptor>() {
-                @Override public boolean apply(DynamicCacheDescriptor desc) {
-                    return desc.started();
-                }
-            });
+        return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() {
+            @Override public String apply(DynamicCacheDescriptor desc) {
+                return desc.cacheConfiguration().getName();
+            }
+        });
     }
 
     /**
@@ -1771,7 +1663,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         if (start) {
-            for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+            for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
                 DynamicCacheDescriptor desc = e.getValue();
 
                 CacheConfiguration ccfg = desc.cacheConfiguration();
@@ -1822,140 +1714,138 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Cache start request.
-     * @param topVer Topology version.
+     * @param cacheDesc Cache start request.
+     * @param nearCfg Near cache configuration.
+     * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
-    public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer)
+    void prepareCacheStart(DynamicCacheDescriptor cacheDesc,
+        @Nullable NearCacheConfiguration nearCfg,
+        AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
-        assert req.start() : req;
-        assert req.cacheType() != null : req;
+        prepareCacheStart(
+            cacheDesc.cacheConfiguration(),
+            nearCfg,
+            cacheDesc,
+            exchTopVer,
+            cacheDesc.schema()
+        );
+    }
 
-        DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
+    /**
+     * @param exchTopVer Current exchange version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
+        List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
 
-        if (desc != null)
-            desc.onStart();
+        if (!F.isEmpty(caches)) {
+            for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
+                DynamicCacheDescriptor desc = t.get1();
 
-        prepareCacheStart(
-            req.startCacheConfiguration(),
-            req.nearCacheConfiguration(),
-            req.cacheType(),
-            req.clientStartOnly(),
-            req.initiatingNodeId(),
-            req.deploymentId(),
-            topVer,
-            desc != null ? desc.schema() : null
-        );
+                prepareCacheStart(
+                    desc.cacheConfiguration(),
+                    t.get2(),
+                    desc,
+                    exchTopVer,
+                    desc.schema()
+                );
+            }
+        }
     }
 
     /**
      * Starts statically configured caches received from remote nodes during exchange.
      *
-     * @param topVer Topology version.
+     * @param nodeId Joining node ID.
+     * @param exchTopVer Current exchange version.
      * @return Started caches descriptors.
      * @throws IgniteCheckedException If failed.
      */
-    public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer)
+    public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
-        List<DynamicCacheDescriptor> started = null;
-
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) {
-                if (desc.receivedFrom() != null) {
-                    AffinityTopologyVersion startVer = desc.receivedFromStartVersion();
+        List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
 
-                    if (startVer == null || startVer.compareTo(topVer) > 0)
-                        continue;
-                }
-
-                if (desc.onStart()) {
-                    if (started == null)
-                        started = new ArrayList<>();
-
-                    started.add(desc);
+        if (started != null) {
+            for (DynamicCacheDescriptor desc : started) {
+                IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter();
 
+                if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
                     prepareCacheStart(
                         desc.cacheConfiguration(),
                         null,
-                        desc.cacheType(),
-                        false,
-                        null,
-                        desc.deploymentId(),
-                        topVer,
+                        desc,
+                        exchTopVer,
                         desc.schema()
                     );
                 }
             }
         }
 
-        return started;
+        return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
     }
 
     /**
-     * @param cfg Start configuration.
-     * @param nearCfg Near configuration.
-     * @param cacheType Cache type.
-     * @param clientStartOnly Client only start request.
-     * @param initiatingNodeId Initiating node ID.
-     * @param deploymentId Deployment ID.
-     * @param topVer Topology version.
+     * @param startCfg Start configuration.
+     * @param reqNearCfg Near configuration if specified for client cache start request.
+     * @param desc Cache descriptor.
+     * @param exchTopVer Current exchange version.
      * @param schema Query schema.
      * @throws IgniteCheckedException If failed.
      */
     private void prepareCacheStart(
-        CacheConfiguration cfg,
-        NearCacheConfiguration nearCfg,
-        CacheType cacheType,
-        boolean clientStartOnly,
-        UUID initiatingNodeId,
-        IgniteUuid deploymentId,
-        AffinityTopologyVersion topVer,
+        CacheConfiguration startCfg,
+        @Nullable NearCacheConfiguration reqNearCfg,
+        DynamicCacheDescriptor desc,
+        AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
-        CacheConfiguration ccfg = new CacheConfiguration(cfg);
-
-        IgnitePredicate nodeFilter = ccfg.getNodeFilter();
+        assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
-        ClusterNode locNode = ctx.discovery().localNode();
+        CacheConfiguration ccfg = new CacheConfiguration(startCfg);
 
-        boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
-        boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
+        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-        if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
-            return;
+        boolean affNode;
 
-        if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
-            if (clientNodeStart && !affNodeStart) {
-                if (nearCfg != null)
-                    ccfg.setNearConfiguration(nearCfg);
-                else
-                    ccfg.setNearConfiguration(null);
-            }
+        if (ccfg.getCacheMode() == LOCAL) {
+            affNode = true;
 
-            CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+            ccfg.setNearConfiguration(null);
+        }
+        else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter()))
+            affNode = true;
+        else {
+            affNode = false;
 
-            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+            ccfg.setNearConfiguration(reqNearCfg);
+        }
 
-            cacheCtx.startTopologyVersion(topVer);
+        GridCacheContext cacheCtx = createCache(ccfg,
+            null,
+            desc,
+            exchTopVer,
+            cacheObjCtx,
+            affNode,
+            true);
 
-            cacheCtx.dynamicDeploymentId(deploymentId);
+        cacheCtx.dynamicDeploymentId(desc.deploymentId());
 
-            GridCacheAdapter cache = cacheCtx.cache();
+        GridCacheAdapter cache = cacheCtx.cache();
 
-            sharedCtx.addCacheContext(cacheCtx);
+        sharedCtx.addCacheContext(cacheCtx);
 
-            caches.put(cacheCtx.name(), cache);
+        caches.put(cacheCtx.name(), cache);
 
-            startCache(cache, schema != null ? schema : new QuerySchema());
+        startCache(cache, schema != null ? schema : new QuerySchema());
 
-            onKernalStart(cache);
-        }
+        onKernalStart(cache);
     }
 
     /**
      * @param req Stop request.
      */
-    public void blockGateway(DynamicCacheChangeRequest req) {
+    void blockGateway(DynamicCacheChangeRequest req) {
         assert req.stop() || req.close();
 
         if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
@@ -1997,9 +1887,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             sharedCtx.removeCacheContext(ctx);
 
-            assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req +
-                ", ctxDepId=" + ctx.dynamicDeploymentId() + ']';
-
             onKernalStop(cache, req.destroy());
 
             stopCache(cache, true, req.destroy());
@@ -2010,52 +1897,52 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
      * @param topVer Completed topology version.
-     * @param reqs Change requests.
+     * @param exchActions Change requests.
      * @param err Error.
      */
     @SuppressWarnings("unchecked")
     public void onExchangeDone(
         AffinityTopologyVersion topVer,
-        Collection<DynamicCacheChangeRequest> reqs,
+        @Nullable ExchangeActions exchActions,
         Throwable err
     ) {
         for (GridCacheAdapter<?, ?> cache : caches.values()) {
             GridCacheContext<?, ?> cacheCtx = cache.context();
 
-            if (F.eq(cacheCtx.startTopologyVersion(), topVer)) {
+            if (cacheCtx.startTopologyVersion().equals(topVer)) {
+                jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+
                 if (cacheCtx.preloader() != null)
                     cacheCtx.preloader().onInitialExchangeComplete(err);
+            }
+        }
 
-                String masked = cacheCtx.name();
+        if (exchActions != null && err == null) {
+            for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+                stopGateway(action.request());
 
-                jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
+                prepareCacheStop(action.request());
             }
-        }
 
-        if (!F.isEmpty(reqs) && err == null) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                String masked = req.cacheName();
+            for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
+                String cacheName = req.cacheName();
 
-                if (req.stop()) {
-                    stopGateway(req);
+                IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
 
-                    prepareCacheStop(req);
-                }
-                else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
-                    IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+                if (proxy != null) {
+                    if (proxy.context().affinityNode()) {
+                        GridCacheAdapter<?, ?> cache = caches.get(cacheName);
 
-                    if (proxy != null) {
-                        if (proxy.context().affinityNode()) {
-                            GridCacheAdapter<?, ?> cache = caches.get(masked);
+                        assert cache != null : cacheName;
 
-                            if (cache != null)
-                                jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
-                        }
-                        else {
-                            proxy.context().gate().onStopped();
+                        jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false));
+                    }
+                    else {
+                        jCacheProxies.remove(cacheName);
 
-                            prepareCacheStop(req);
-                        }
+                        proxy.context().gate().onStopped();
+
+                        prepareCacheStop(req);
                     }
                 }
             }
@@ -2063,20 +1950,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Request to complete future for.
+     * @param cacheName Cache name.
+     * @param deploymentId Future deployment ID.
      */
-    public void completeStartFuture(DynamicCacheChangeRequest req) {
-        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
-        assert req.deploymentId() != null || req.globalStateChange() || req.resetLostPartitions();
-        assert fut == null || fut.deploymentId != null || req.globalStateChange() || req.resetLostPartitions();
+    void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
+        GridCacheProcessor.TemplateConfigurationFuture fut =
+            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);
 
-        if (fut != null && F.eq(fut.deploymentId(), req.deploymentId()) &&
-            F.eq(req.initiatingNodeId(), ctx.localNodeId()))
+        if (fut != null && fut.deploymentId().equals(deploymentId))
             fut.onDone();
     }
 
     /**
+     * @param req Request to complete future for.
+     * @param err Error if any.
+     */
+    void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
+        if (req.initiatingNodeId().equals(ctx.localNodeId())) {
+            DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
+
+            if (fut != null)
+                fut.onDone(null, err);
+        }
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -2132,322 +2030,40 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
-        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+        cachesInfo.collectJoiningNodeData(dataBag);
     }
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+        cachesInfo.collectGridNodeData(dataBag);
     }
 
     /**
-     * @param joiningNodeId Joining node id.
+     * @return {@code True} if need locally start all existing caches on client node start.
      */
-    private Serializable getDiscoveryData(UUID joiningNodeId) {
-        boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
+    private boolean startAllCachesOnClientStart() {
+        return START_CLIENT_CACHES && ctx.clientNode();
+    }
 
-        // Collect dynamically started caches to a single object.
-        Collection<DynamicCacheChangeRequest> reqs;
+    /** {@inheritDoc} */
+    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+        cachesInfo.onJoiningNodeDataReceived(data);
+    }
 
-        Map<String, Map<UUID, Boolean>> clientNodesMap;
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(GridDiscoveryData data) {
+        cachesInfo.onGridDataReceived(data);
+    }
 
-        if (reconnect) {
-            reqs = new ArrayList<>(caches.size() + 1);
-
-            clientNodesMap = U.newHashMap(caches.size());
-
-            collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
-        }
-        else {
-            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
-
-            clientNodesMap = ctx.discovery().clientNodesMap();
-
-            collectDataOnGridNode(reqs);
-        }
-
-        DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
-
-        batch.clientNodes(clientNodesMap);
-
-        batch.clientReconnect(reconnect);
-
-        if (ctx.localNodeId().equals(joiningNodeId))
-            batch.startCaches(startAllCachesOnClientStart());
-
-        // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
-        batch.id(null);
-
-        return batch;
-    }
-
-    /**
-     * @param reqs requests.
-     */
-    private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            // RequestId must be null because on different node will be different byte [] and
-            // we get duplicate discovery data, for more details see
-            // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
-                null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.cacheType(desc.cacheType());
-            req.deploymentId(desc.deploymentId());
-            req.receivedFrom(desc.receivedFrom());
-            req.schema(desc.schema());
-
-            reqs.add(req);
-        }
-
-        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-            // RequestId must be null because on different node will be different byte [] and
-            // we get duplicate discovery data, for more details see
-            // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
-                null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.schema(desc.schema());
-
-            req.template(true);
-
-            reqs.add(req);
-        }
-    }
-
-    /**
-     * @param reqs requests.
-     * @param clientNodesMap Client nodes map.
-     * @param nodeId Node id.
-     */
-    private void collectDataOnReconnectingNode(
-            Collection<DynamicCacheChangeRequest> reqs,
-            Map<String, Map<UUID, Boolean>> clientNodesMap,
-            UUID nodeId
-    ) {
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
-
-            if (desc == null)
-                continue;
-
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.cacheType(desc.cacheType());
-            req.deploymentId(desc.deploymentId());
-            req.receivedFrom(desc.receivedFrom());
-            req.schema(desc.schema());
-
-            reqs.add(req);
-
-            Boolean nearEnabled = cache.isNear();
-
-            Map<UUID, Boolean> map = U.newHashMap(1);
-
-            map.put(nodeId, nearEnabled);
-
-            clientNodesMap.put(cache.name(), map);
-        }
-    }
-
-    /**
-     * @return {@code True} if need locally start all existing caches on client node start.
-     */
-    private boolean startAllCachesOnClientStart() {
-        return START_CLIENT_CACHES && ctx.clientNode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
-        if (data.hasJoiningNodeData()) {
-            Serializable joiningNodeData = data.joiningNodeData();
-            if (joiningNodeData instanceof DynamicCacheChangeBatch)
-                onDiscoDataReceived(
-                        data.joiningNodeId(),
-                        data.joiningNodeId(),
-                        (DynamicCacheChangeBatch) joiningNodeData, true);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onGridDataReceived(GridDiscoveryData data) {
-        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
-
-        if (nodeSpecData != null) {
-            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
-                if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
-                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
-
-                    onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param joiningNodeId Joining node id.
-     * @param rmtNodeId Rmt node id.
-     * @param batch Batch.
-     * @param join Whether this is data from joining node.
-     */
-    private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) {
-        if (batch.clientReconnect()) {
-            if (ctx.clientDisconnected()) {
-                if (clientReconnectReqs == null)
-                    clientReconnectReqs = new LinkedHashMap<>();
-
-                clientReconnectReqs.put(joiningNodeId, batch);
-
-                return;
-            }
-
-            processClientReconnectData(joiningNodeId, batch);
-        }
-        else {
-            for (DynamicCacheChangeRequest req : batch.requests()) {
-                initReceivedCacheConfiguration(req);
-
-                if (req.template()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                    assert ccfg != null : req;
-
-                    DynamicCacheDescriptor existing = registeredTemplates.get(req.cacheName());
-
-                    if (existing == null) {
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                                ctx,
-                                ccfg,
-                                req.cacheType(),
-                                true,
-                                req.deploymentId(),
-                                req.schema());
-
-                        registeredTemplates.put(req.cacheName(), desc);
-                    }
-
-                    continue;
-                }
-
-                DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName());
-
-                if (req.start() && !req.clientStartOnly()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                    if (existing != null) {
-                        if (joiningNodeId.equals(ctx.localNodeId())) {
-                            existing.receivedFrom(req.receivedFrom());
-                            existing.deploymentId(req.deploymentId());
-                        }
-
-                        if (existing.locallyConfigured()) {
-                            existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
-
-                            if (!join)
-                                // Overwrite existing with remote.
-                                existing.schema(req.schema());
-
-                            ctx.discovery().setCacheFilter(
-                                    req.cacheName(),
-                                    ccfg.getNodeFilter(),
-                                    ccfg.getNearConfiguration() != null,
-                                    ccfg.getCacheMode());
-                        }
-                    }
-                    else {
-                        assert req.cacheType() != null : req;
-
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                                ctx,
-                                ccfg,
-                                req.cacheType(),
-                                false,
-                                req.deploymentId(),
-                                req.schema());
-
-                        // Received statically configured cache.
-                        if (req.initiatingNodeId() == null)
-                            desc.staticallyConfigured(true);
-
-                        if (joiningNodeId.equals(ctx.localNodeId()))
-                            desc.receivedOnDiscovery(true);
-
-                        desc.receivedFrom(req.receivedFrom());
-
-                        DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc);
-
-                        assert old == null : old;
-
-                        ctx.discovery().setCacheFilter(
-                                req.cacheName(),
-                                ccfg.getNodeFilter(),
-                                ccfg.getNearConfiguration() != null,
-                                ccfg.getCacheMode());
-                    }
-                }
-            }
-
-            if (!F.isEmpty(batch.clientNodes())) {
-                for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
-                    String cacheName = entry.getKey();
-
-                    for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
-                        ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
-                }
-            }
-
-            if (batch.startCaches()) {
-                for (Map.Entry<String, DynamicCacheDescriptor> entry : registeredCaches.entrySet())
-                    ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false);
-            }
-        }
-    }
-
-    /**
-     * @param clientNodeId Client node ID.
-     * @param batch Cache change batch.
-     */
-    private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) {
-        assert batch.clientReconnect() : batch;
-
-        for (DynamicCacheChangeRequest req : batch.requests()) {
-            assert !req.template() : req;
-
-            initReceivedCacheConfiguration(req);
-
-            String name = req.cacheName();
-
-            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
-
-            if (!sysCache) {
-                DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
-                if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
-                    Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
-
-                    assert nodes != null : req;
-                    assert nodes.containsKey(clientNodeId) : nodes;
-
-                    ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId));
-                }
-            }
-            else
-                ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false);
-        }
-    }
-
-    /**
-     * Dynamically starts cache using template configuration.
-     *
-     * @param cacheName Cache name.
-     * @return Future that will be completed when cache is deployed.
-     */
-    public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
-        try {
-            CacheConfiguration cfg = createConfigFromTemplate(cacheName);
+    /**
+     * Dynamically starts cache using template configuration.
+     *
+     * @param cacheName Cache name.
+     * @return Future that will be completed when cache is deployed.
+     */
+    public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
+        try {
+            CacheConfiguration cfg = createConfigFromTemplate(cacheName);
 
             return dynamicStartCache(cfg, cacheName, null, true, true, true);
         }
@@ -2491,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         List<CacheConfiguration> wildcardNameCfgs = null;
 
-        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+        for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
             assert desc.template();
 
             CacheConfiguration cfg = desc.cacheConfiguration();
@@ -2703,12 +2319,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (checkThreadTx)
             checkEmptyTransactions();
 
-        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-        t.stop(true);
-        t.destroy(true);
+        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true);
 
-        return F.first(initiateCacheChanges(F.asList(t), false));
+        return F.first(initiateCacheChanges(F.asList(req), false));
     }
 
     /**
@@ -2723,11 +2336,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
 
         for (String cacheName : cacheNames) {
-            DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+            DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true);
 
-            t.stop(true);
-
-            reqs.add(t);
+            reqs.add(req);
         }
 
         GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
@@ -2744,7 +2355,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheName Cache name to close.
      * @return Future that will be completed when cache is closed.
      */
-    public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+    IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
         assert cacheName != null;
 
         IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
@@ -2754,9 +2365,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         checkEmptyTransactions();
 
-        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-        t.close(true);
+        DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName);
 
         return F.first(initiateCacheChanges(F.asList(t), false));
     }
@@ -2771,7 +2380,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         checkEmptyTransactions();
 
         if (F.isEmpty(cacheNames))
-            cacheNames = registeredCaches.keySet();
+            cacheNames = cachesInfo.registeredCaches().keySet();
 
         Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
 
@@ -2779,16 +2388,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
             if (desc == null) {
-                log.warning("Reset lost partition will not be executed, " +
-                    "because cache with name:" + cacheName + " doesn't not exist");
+                U.warn(log, "Failed to find cache for reset lost partition request, cache does not exist: " + cacheName);
 
                 continue;
             }
 
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-                UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-            req.markResetLostPartitions();
+            DynamicCacheChangeRequest req = DynamicCacheChangeRequest.resetLostPartitions(ctx, cacheName);
 
             reqs.add(req);
         }
@@ -2841,14 +2446,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
         for (String cacheName : cacheNames()) {
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-                UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-            DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
-
-            req.deploymentId(desc.deploymentId());
-            req.stop(true);
-            req.destroy(false);
+            DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false);
 
             reqs.add(req);
         }
@@ -2876,11 +2474,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         String cacheName = cfg.getName();
 
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-            UUID.randomUUID(), cacheName, ctx.localNodeId());
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
 
         req.startCacheConfiguration(cfg);
-        req.template(cfg.getName() != null && cfg.getName().endsWith("*"));
+        req.template(cfg.getName().endsWith("*"));
         req.nearCacheConfiguration(cfg.getNearConfiguration());
         req.deploymentId(IgniteUuid.randomUuid());
         req.schema(new QuerySchema(cfg.getQueryEntities()));
@@ -2910,7 +2507,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
 
         for (DynamicCacheChangeRequest req : reqs) {
-            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req);
+            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req);
 
             try {
                 if (req.stop() || req.close()) {
@@ -2927,14 +2524,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                             req.stop(true);
                         }
-
-                        IgniteUuid dynamicDeploymentId = desc.deploymentId();
-
-                        assert dynamicDeploymentId != null : desc;
-
-                        // Save deployment ID to avoid concurrent stops.
-                        req.deploymentId(dynamicDeploymentId);
-                        fut.deploymentId = dynamicDeploymentId;
                     }
                 }
 
@@ -2944,7 +2533,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(
                     req.requestId(), fut);
 
-                assert old == null; //TODO : check failIfExists.
+                assert old == null;
 
                 if (fut.isDone())
                     continue;
@@ -2993,12 +2582,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      */
     public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
-        if (type == EVT_NODE_JOINED) {
-            for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) {
-                if (node.id().equals(cacheDesc.receivedFrom()))
-                    cacheDesc.receivedFromStartVersion(topVer);
-            }
-        }
+        cachesInfo.onDiscoveryEvent(type, node, topVer);
 
         sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
     }
@@ -3025,198 +2609,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             return true;
 
         if (msg instanceof DynamicCacheChangeBatch)
-            return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
+            return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
 
         return false;
     }
 
     /**
-     * @param batch Change request batch.
-     * @param topVer Current topology version.
-     * @return {@code True} if minor topology version should be increased.
-     */
-    private boolean onCacheChangeRequested(
-        DynamicCacheChangeBatch batch,
-        AffinityTopologyVersion topVer
-    ) {
-        AffinityTopologyVersion newTopVer = null;
-
-        boolean incMinorTopVer = false;
-
-        for (DynamicCacheChangeRequest req : batch.requests()) {
-            initReceivedCacheConfiguration(req);
-
-            if (req.template()) {
-                CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                assert ccfg != null : req;
-
-                DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
-
-                if (desc == null) {
-                    DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true,
-                        req.deploymentId(), req.schema());
-
-                    DynamicCacheDescriptor old = registeredTemplates.put(ccfg.getName(), templateDesc);
-
-                    assert old == null :
-                        "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']';
-                }
-
-                TemplateConfigurationFuture fut =
-                    (TemplateConfigurationFuture)pendingTemplateFuts.get(ccfg.getName());
-
-                if (fut != null && fut.deploymentId().equals(req.deploymentId()))
-                    fut.onDone();
-
-                continue;
-            }
-
-            DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
-            DynamicCacheStartFuture fut = null;
-
-            if (ctx.localNodeId().equals(req.initiatingNodeId())) {
-                fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
-                if (fut != null && !F.eq(req.deploymentId(), fut.deploymentId()))
-                    fut = null;
-            }
-
-            boolean needExchange = false;
-
-            if (req.start()) {
-                if (desc == null) {
-                    if (req.clientStartOnly()) {
-                        if (fut != null)
-                            fut.onDone(new IgniteCheckedException("Failed to start client cache " +
-                                "(a cache with the given name is not started): " + U.maskName(req.cacheName())));
-                    }
-                    else {
-                        CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                        assert req.cacheType() != null : req;
-                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
-
-                        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false,
-                            req.deploymentId(), req.schema());
-
-                        if (newTopVer == null) {
-                            newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
-                                topVer.minorTopologyVersion() + 1);
-                        }
-
-                        startDesc.startTopologyVersion(newTopVer);
-
-                        DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc);
-
-                        assert old == null :
-                            "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
-
-                        ctx.discovery().setCacheFilter(
-                            ccfg.getName(),
-                            ccfg.getNodeFilter(),
-                            ccfg.getNearConfiguration() != null,
-                            ccfg.getCacheMode());
-
-                        ctx.discovery().addClientNode(req.cacheName(),
-                            req.initiatingNodeId(),
-                            req.nearCacheConfiguration() != null);
-
-                        needExchange = true;
-                    }
-                }
-                else {
-                    assert req.initiatingNodeId() != null : req;
-
-                    // Cache already exists, exchange is needed only if client cache should be created.
-                    ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
-
-                    boolean clientReq = node != null &&
-                        !ctx.discovery().cacheAffinityNode(node, req.cacheName());
-
-                    if (req.clientStartOnly()) {
-                        needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
-                            req.initiatingNodeId(),
-                            req.nearCacheConfiguration() != null);
-                    }
-                    else {
-                        if (req.failIfExists()) {
-                            if (fut != null)
-                                fut.onDone(new CacheExistsException("Failed to start cache " +
-                                    "(a cache with the same name is already started): " + U.maskName(req.cacheName())));
-                        }
-                        else {
-                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
-                                req.initiatingNodeId(),
-                                req.nearCacheConfiguration() != null);
-
-                            if (needExchange)
-                                req.clientStartOnly(true);
-                        }
-                    }
-
-                    if (needExchange) {
-                        if (newTopVer == null) {
-                            newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
-                                topVer.minorTopologyVersion() + 1);
-                        }
-
-                        desc.clientCacheStartVersion(newTopVer);
-                    }
-                }
-
-                if (!needExchange && desc != null) {
-                    if (desc.clientCacheStartVersion() != null)
-                        req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
-                    else
-                        req.cacheFutureTopologyVersion(desc.startTopologyVersion());
-                }
-            }
-            else if (req.globalStateChange() || req.resetLostPartitions())
-                needExchange = true;
-            else {
-                assert req.stop() ^ req.close() : req;
-
-                if (desc != null) {
-                    if (req.stop()) {
-                        DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
-
-                        assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
-
-                        ctx.discovery().removeCacheFilter(req.cacheName());
-
-                        needExchange = true;
-                    }
-                    else {
-                        assert req.close() : req;
-
-                        needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
-                    }
-                }
-            }
-
-            req.exchangeNeeded(needExchange);
-
-            incMinorTopVer |= needExchange;
-        }
-
-        return incMinorTopVer;
-    }
-
-    /**
-     * @param req Cache change request.
-     */
-    private void initReceivedCacheConfiguration(DynamicCacheChangeRequest req) {
-        if (req.startCacheConfiguration() != null) {
-            CacheConfiguration ccfg = req.startCacheConfiguration();
-
-            if (ccfg.isStoreKeepBinary() == null)
-                ccfg.setStoreKeepBinary(CacheConfiguration.DFLT_STORE_KEEP_BINARY);
-        }
-    }
-
-    /**
      * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
      *
      * @param cfgs Caches.
@@ -3291,110 +2689,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Checks that remote caches has configuration compatible with the local.
-     *
-     * @param locCfg Local configuration.
-     * @param rmtCfg Remote configuration.
-     * @param rmtNode Remote node.
-     * @throws IgniteCheckedException If check failed.
-     */
-    private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException {
-        ClusterNode locNode = ctx.discovery().localNode();
-
-        UUID rmt = rmtNode.id();
-
-        GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
-        GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
-
-        boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter());
-        boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter());
-
-        CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
-            locAttr.cacheMode(), rmtAttr.cacheMode(), true);
-
-        if (rmtAttr.cacheMode() != LOCAL) {
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
-                locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
-                "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
-                "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
-
-            boolean checkStore = isLocAff && isRmtAff;
-
-            if (checkStore)
-                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
-                    locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
-                locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
-                "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
-                rmtAttr.cacheAffinityMapperClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
-                "Affinity partitions count", locAttr.affinityPartitionsCount(),
-                rmtAttr.affinityPartitionsCount(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
-                locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
-                locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
-                "Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
-                rmtAttr.transactionManagerLookupClassName(), false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
-                "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
-                "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
-                "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
-                true);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
-                "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
-                false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
-                "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
-                "Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
-                rmtAttr.writeBehindFlushFrequency(), false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
-                "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
-                false);
-
-            CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
-                "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
-                rmtAttr.writeBehindFlushThreadCount(), false);
-
-            if (locAttr.cacheMode() == PARTITIONED) {
-                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
-                    "Near eviction policy", locAttr.nearEvictionPolicyClassName(),
-                    rmtAttr.nearEvictionPolicyClassName(), false);
-
-                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
-                    "Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
-                    rmtAttr.affinityIncludeNeighbors(), true);
-
-                CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
-                    "Affinity key backups", locAttr.affinityKeyBackups(),
-                    rmtAttr.affinityKeyBackups(), true);
-            }
-        }
-    }
-
-    /**
      * @param rmt Remote node to check.
      * @throws IgniteCheckedException If check failed.
      */
@@ -3635,27 +2929,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Descriptor.
      */
     public DynamicCacheDescriptor cacheDescriptor(String name) {
-        return name != null ? registeredCaches.get(name) : null;
-    }
-
-    /**
-     * Put registered cache descriptor.
-     *
-     * @param name Name.
-     * @param desc Descriptor.
-     * @return Old descriptor (if any).
-     */
-    private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) {
-        assert name != null;
-
-        return registeredCaches.put(name, desc);
+        return cachesInfo.registeredCaches().get(name);
     }
 
     /**
      * @return Cache descriptors.
      */
     public Collection<DynamicCacheDescriptor> cacheDescriptors() {
-        return registeredCaches.values();
+        return cachesInfo.registeredCaches().values();
     }
 
     /**
@@ -3682,23 +2963,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
         assert cacheCfg.getName() != null;
 
-        String masked = cacheCfg.getName();
+        String name = cacheCfg.getName();
 
-        DynamicCacheDescriptor desc = registeredTemplates.get(masked);
+        DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(name);
 
         if (desc != null)
             return;
 
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheCfg.getName(), ctx.localNodeId());
-
-        CacheConfiguration cfg = new CacheConfiguration(cacheCfg);
-
-        req.template(true);
-
-        req.startCacheConfiguration(cfg);
-        req.schema(new QuerySchema(cfg.getQueryEntities()));
-
-        req.deploymentId(IgniteUuid.randomUuid());
+        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg);
 
         TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());
 
@@ -3749,6 +3021,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param name Cache name.
+     * @return Cache proxy.
+     */
+    @Nullable public IgniteCacheProxy jcacheProxy(String name) {
+        return jCacheProxies.get(name);
+    }
+
+    /**
      * @return All configured public cache instances.
      */
     public Collection<IgniteCacheProxy<?, ?>> publicCaches() {
@@ -3859,7 +3139,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void createMissingQueryCaches() throws IgniteCheckedException {
-        for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+        for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
             DynamicCacheDescriptor desc = e.getValue();
 
             if (isMissingQueryCache(desc))
@@ -4171,10 +3451,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
     private class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
-        /** Start ID. */
-        @GridToStringInclude
-        private IgniteUuid deploymentId;
-
         /** Cache name. */
         private String cacheName;
 
@@ -4184,23 +3460,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         /**
          * @param cacheName Cache name.
-         * @param deploymentId Deployment ID.
          * @param req Cache start request.
          */
-        private DynamicCacheStartFuture(String cacheName, IgniteUuid deploymentId, DynamicCacheChangeRequest req) {
-            this.deploymentId = deploymentId;
+        private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) {
             this.cacheName = cacheName;
             this.req = req;
         }
 
         /**
-         * @return Start ID.
-         */
-        public IgniteUuid deploymentId() {
-            return deploymentId;
-        }
-
-        /**
          * @return Request.
          */
         public DynamicCacheChangeRequest request() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..1de64c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         long updateSeq = this.updateSeq.incrementAndGet();
 
         // If this is the oldest node.
-        if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+        if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) {
             if (node2part == null) {
                 node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 94f11ed..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Affinity assignment request.
@@ -32,6 +31,9 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version being queried. */
     private AffinityTopologyVersion topVer;
 
@@ -43,14 +45,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      */
-    public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) {
+    public GridDhtAffinityAssignmentRequest(
+        long futId,
+        int cacheId,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
     }
 
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return false;
@@ -75,7 +91,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 5;
     }
 
     /** {@inheritDoc} */
@@ -94,6 +110,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (writer.state()) {
             case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -116,6 +138,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
         switch (reader.state()) {
             case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())


[4/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 66e780f..3c65326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -18,10 +18,9 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -33,29 +32,22 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Discovery custom message ID. */
+    private IgniteUuid id = IgniteUuid.randomUuid();
+
     /** Change requests. */
     @GridToStringInclude
     private Collection<DynamicCacheChangeRequest> reqs;
 
-    /** Client nodes map. Used in discovery data exchange. */
-    @GridToStringInclude
-    private Map<String, Map<UUID, Boolean>> clientNodes;
-
-    /** Custom message ID. */
-    private IgniteUuid id = IgniteUuid.randomUuid();
-
-    /** */
-    private boolean clientReconnect;
-
-    /** */
-    private boolean startCaches;
+    /** Cache updates to be executed on exchange. */
+    private transient ExchangeActions exchangeActions;
 
     /**
      * @param reqs Requests.
      */
-    public DynamicCacheChangeBatch(
-        Collection<DynamicCacheChangeRequest> reqs
-    ) {
+    public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs) {
+        assert !F.isEmpty(reqs) : reqs;
+
         this.reqs = reqs;
     }
 
@@ -64,34 +56,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return id;
     }
 
-    /**
-     * @param id Message ID.
-     */
-    public void id(IgniteUuid id) {
-        this.id = id;
-    }
-
-    /**
-     * @return Collection of change requests.
-     */
-    public Collection<DynamicCacheChangeRequest> requests() {
-        return reqs;
-    }
-
-    /**
-     * @return Client nodes map.
-     */
-    public Map<String, Map<UUID, Boolean>> clientNodes() {
-        return clientNodes;
-    }
-
-    /**
-     * @param clientNodes Client nodes map.
-     */
-    public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
-        this.clientNodes = clientNodes;
-    }
-
     /** {@inheritDoc} */
     @Nullable @Override public DiscoveryCustomMessage ackMessage() {
         return null;
@@ -103,45 +67,33 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /**
-     * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
-     */
-    public void clientReconnect(boolean clientReconnect) {
-        this.clientReconnect = clientReconnect;
-    }
-
-    /**
-     * @return {@code True} if this is discovery data sent on client reconnect.
+     * @return Collection of change requests.
      */
-    public boolean clientReconnect() {
-        return clientReconnect;
+    public Collection<DynamicCacheChangeRequest> requests() {
+        return reqs;
     }
 
     /**
-     * @return {@code True} if required to start all caches on client node.
+     * @return {@code True} if request should trigger partition exchange.
      */
-    public boolean startCaches() {
-        return startCaches;
+    public boolean exchangeNeeded() {
+        return exchangeActions != null;
     }
 
     /**
-     * @param startCaches {@code True} if required to start all caches on client node.
+     * @return Cache updates to be executed on exchange.
      */
-    public void startCaches(boolean startCaches) {
-        this.startCaches = startCaches;
+    ExchangeActions exchangeActions() {
+        return exchangeActions;
     }
 
     /**
-     * @return {@code True} if request should trigger partition exchange.
+     * @param exchangeActions Cache updates to be executed on exchange.
      */
-    public boolean exchangeNeeded() {
-        if (reqs != null) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.exchangeNeeded())
-                    return true;
-            }
-        }
+    void exchangeActions(ExchangeActions exchangeActions) {
+        assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
 
-        return false;
+        this.exchangeActions = exchangeActions;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 9d2563d..f8c2c7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -88,78 +88,114 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Dynamic schema. */
     private QuerySchema schema;
 
-    /** */
-    private transient boolean exchangeNeeded;
-
-    /** */
-    private transient AffinityTopologyVersion cacheFutTopVer;
-
     /**
-     * Constructor creates cache stop request.
-     *
+     * @param reqId Unique request ID.
      * @param cacheName Cache stop name.
      * @param initiatingNodeId Initiating node ID.
      */
     public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
+        assert reqId != null;
+        assert cacheName != null;
+        assert initiatingNodeId != null;
+
         this.reqId = reqId;
         this.cacheName = cacheName;
         this.initiatingNodeId = initiatingNodeId;
     }
 
     /**
-     * @return Request ID.
+     * @param reqId Unique request ID.
+     * @param state New cluster state.
+     * @param initiatingNodeId Initiating node ID.
      */
-    public UUID requestId() {
-        return reqId;
+    public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) {
+        assert reqId != null;
+        assert state != null;
+        assert initiatingNodeId != null;
+
+        this.reqId = reqId;
+        this.state = state;
+        this.initiatingNodeId = initiatingNodeId;
     }
 
     /**
-     * @return {@code True} if request should trigger partition exchange.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @return Request to reset lost partitions.
      */
-    public boolean exchangeNeeded() {
-        return exchangeNeeded;
+    static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName) {
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+        req.markResetLostPartitions();
+
+        return req;
     }
 
     /**
-     * @return State.
+     * @param ctx Context.
+     * @param cfg0 Template configuration.
+     * @return Request to add template.
      */
-    public ClusterState state() {
-        return state;
+    static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
+        CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
+
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
+
+        req.template(true);
+        req.startCacheConfiguration(cfg);
+        req.schema(new QuerySchema(cfg.getQueryEntities()));
+        req.deploymentId(IgniteUuid.randomUuid());
+
+        return req;
     }
 
     /**
-     * @param state State.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @return Request to close client cache.
      */
-    public void state(ClusterState state) {
-        this.state = state;
+    static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) {
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+        req.close(true);
+
+        return req;
     }
 
     /**
-     * @return {@code True} if global caches state is changes.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @param destroy Destroy flag.
+     * @return Cache stop request.
      */
-    public boolean globalStateChange() {
-        return state != null;
+    static DynamicCacheChangeRequest stopRequest(GridKernalContext ctx, String cacheName, boolean destroy) {
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+        req.stop(true);
+        req.destroy(destroy);
+
+        return req;
     }
 
     /**
-     * @param cacheFutTopVer Ready topology version when dynamic cache future should be completed.
+     * @return Request ID.
      */
-    public void cacheFutureTopologyVersion(AffinityTopologyVersion cacheFutTopVer) {
-        this.cacheFutTopVer = cacheFutTopVer;
+    public UUID requestId() {
+        return reqId;
     }
 
     /**
-     * @return Ready topology version when dynamic cache future should be completed.
+     * @return State.
      */
-    @Nullable public AffinityTopologyVersion cacheFutureTopologyVersion() {
-        return cacheFutTopVer;
+    public ClusterState state() {
+        return state;
     }
 
     /**
-     * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+     * @return {@code True} if global caches state is changes.
      */
-    public void exchangeNeeded(boolean exchangeNeeded) {
-        this.exchangeNeeded = exchangeNeeded;
+    public boolean globalStateChange() {
+        return state != null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 09b4c3a..40d3706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,10 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,21 +45,12 @@ public class DynamicCacheDescriptor {
     @GridToStringExclude
     private CacheConfiguration cacheCfg;
 
-    /** Locally configured flag. */
-    private boolean locCfg;
-
     /** Statically configured flag. */
-    private boolean staticCfg;
-
-    /** Started flag. */
-    private boolean started;
+    private final boolean staticCfg;
 
     /** Cache type. */
     private CacheType cacheType;
 
-    /** */
-    private volatile Map<UUID, CacheConfiguration> rmtCfgs;
-
     /** Template configuration flag. */
     private boolean template;
 
@@ -71,19 +61,10 @@ public class DynamicCacheDescriptor {
     private boolean updatesAllowed = true;
 
     /** */
-    private AffinityTopologyVersion startTopVer;
-
-    /** */
-    private boolean rcvdOnDiscovery;
-
-    /** */
     private Integer cacheId;
 
     /** */
-    private UUID rcvdFrom;
-
-    /** */
-    private AffinityTopologyVersion rcvdFromVer;
+    private final UUID rcvdFrom;
 
     /** Mutex. */
     private final Object mux = new Object();
@@ -92,7 +73,16 @@ public class DynamicCacheDescriptor {
     private volatile CacheObjectContext objCtx;
 
     /** */
-    private transient AffinityTopologyVersion clientCacheStartVer;
+    private boolean rcvdOnDiscovery;
+
+    /** */
+    private AffinityTopologyVersion startTopVer;
+
+    /** */
+    private AffinityTopologyVersion rcvdFromVer;
+
+    /** */
+    private AffinityTopologyVersion clientCacheStartVer;
 
     /** Mutex to control schema. */
     private final Object schemaMux = new Object();
@@ -105,21 +95,34 @@ public class DynamicCacheDescriptor {
      * @param cacheCfg Cache configuration.
      * @param cacheType Cache type.
      * @param template {@code True} if this is template configuration.
+     * @param rcvdFrom ID of node provided cache configuration
+     * @param staticCfg {@code True} if cache statically configured.
      * @param deploymentId Deployment ID.
+     * @param schema Query schema.
      */
     @SuppressWarnings("unchecked")
     public DynamicCacheDescriptor(GridKernalContext ctx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
         boolean template,
+        UUID rcvdFrom,
+        boolean staticCfg,
         IgniteUuid deploymentId,
         QuerySchema schema) {
         assert cacheCfg != null;
         assert schema != null;
 
+        if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) {
+            cacheCfg = new CacheConfiguration(cacheCfg);
+
+            cacheCfg.setNearConfiguration(null);
+        }
+
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
         this.template = template;
+        this.rcvdFrom = rcvdFrom;
+        this.staticCfg = staticCfg;
         this.deploymentId = deploymentId;
 
         pluginMgr = new CachePluginManager(ctx, cacheCfg);
@@ -139,20 +142,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @return Start topology version.
-     */
-    @Nullable public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
-    }
-
-    /**
-     * @param startTopVer Start topology version.
-     */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        this.startTopVer = startTopVer;
-    }
-
-    /**
      * @return {@code True} if this is template configuration.
      */
     public boolean template() {
@@ -174,27 +163,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param deploymentId Deployment ID.
-     */
-    public void deploymentId(IgniteUuid deploymentId) {
-        this.deploymentId = deploymentId;
-    }
-
-    /**
-     * @return Locally configured flag.
-     */
-    public boolean locallyConfigured() {
-        return locCfg;
-    }
-
-    /**
-     * @param locCfg Locally configured flag.
-     */
-    public void locallyConfigured(boolean locCfg) {
-        this.locCfg = locCfg;
-    }
-
-    /**
      * @return {@code True} if statically configured.
      */
     public boolean staticallyConfigured() {
@@ -202,30 +170,12 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param staticCfg {@code True} if statically configured.
+     * @return Cache name.
      */
-    public void staticallyConfigured(boolean staticCfg) {
-        this.staticCfg = staticCfg;
-    }
+    public String cacheName() {
+        assert cacheCfg != null : this;
 
-    /**
-     * @return {@code True} if started flag was flipped by this call.
-     */
-    public boolean onStart() {
-        if (!started) {
-            started = true;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return Started flag.
-     */
-    public boolean started() {
-        return started;
+        return cacheCfg.getName();
     }
 
     /**
@@ -239,6 +189,7 @@ public class DynamicCacheDescriptor {
      * Creates and caches cache object context if needed.
      *
      * @param proc Object processor.
+     * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws IgniteCheckedException {
         if (objCtx == null) {
@@ -259,36 +210,6 @@ public class DynamicCacheDescriptor {
     }
 
     /**
-     * @param nodeId Remote node ID.
-     * @return Configuration.
-     */
-    public CacheConfiguration remoteConfiguration(UUID nodeId) {
-        Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
-        return cfgs == null ? null : cfgs.get(nodeId);
-    }
-
-    /**
-     * @param nodeId Remote node ID.
-     * @param cfg Remote node configuration.
-     */
-    public void addRemoteConfiguration(UUID nodeId, CacheConfiguration cfg) {
-        Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
-        if (cfgs == null)
-            rmtCfgs = cfgs = new HashMap<>();
-
-        cfgs.put(nodeId, cfg);
-    }
-
-    /**
-     *
-     */
-    public void clearRemoteConfigurations() {
-        rmtCfgs = null;
-    }
-
-    /**
      * @return Updates allowed flag.
      */
     public boolean updatesAllowed() {
@@ -305,43 +226,51 @@ public class DynamicCacheDescriptor {
     /**
      * @return {@code True} if received in discovery data.
      */
-    public boolean receivedOnDiscovery() {
+    boolean receivedOnDiscovery() {
         return rcvdOnDiscovery;
     }
 
     /**
      * @param rcvdOnDiscovery {@code True} if received in discovery data.
      */
-    public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+    void receivedOnDiscovery(boolean rcvdOnDiscovery) {
         this.rcvdOnDiscovery = rcvdOnDiscovery;
     }
 
     /**
-     * @param nodeId ID of node provided cache configuration in discovery data.
+     * @return ID of node provided cache configuration in discovery data.
      */
-    public void receivedFrom(UUID nodeId) {
-        rcvdFrom = nodeId;
+    @Nullable public UUID receivedFrom() {
+        return rcvdFrom;
     }
 
     /**
      * @return Topology version when node provided cache configuration was started.
      */
-    @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+    @Nullable AffinityTopologyVersion receivedFromStartVersion() {
         return rcvdFromVer;
     }
 
     /**
      * @param rcvdFromVer Topology version when node provided cache configuration was started.
      */
-    public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+    void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
         this.rcvdFromVer = rcvdFromVer;
     }
 
+
     /**
-     * @return ID of node provided cache configuration in discovery data.
+     * @return Start topology version.
      */
-    @Nullable public UUID receivedFrom() {
-        return rcvdFrom;
+    @Nullable public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @param startTopVer Start topology version.
+     */
+    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+        this.startTopVer = startTopVer;
     }
 
     /**
@@ -354,7 +283,7 @@ public class DynamicCacheDescriptor {
     /**
      * @param clientCacheStartVer Version when client cache on local node was started.
      */
-    public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+    void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
         this.clientCacheStartVer = clientCacheStartVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
new file mode 100644
index 0000000..eac1120
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
+ */
+public class ExchangeActions {
+    /** */
+    private Map<String, ActionData> cachesToStart;
+
+    /** */
+    private Map<String, ActionData> clientCachesToStart;
+
+    /** */
+    private Map<String, ActionData> cachesToStop;
+
+    /** */
+    private Map<String, ActionData> cachesToClose;
+
+    /** */
+    private Map<String, ActionData> cachesToResetLostParts;
+
+    /** */
+    private ClusterState newState;
+
+    /**
+     * @return {@code True} if server nodes should not participate in exchange.
+     */
+    boolean clientOnlyExchange() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToResetLostParts);
+    }
+
+    /**
+     * @param nodeId Local node ID.
+     * @return Close cache requests.
+     */
+    List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+        List<DynamicCacheChangeRequest> res = null;
+
+        if (cachesToClose != null) {
+            for (ActionData req : cachesToClose.values()) {
+                if (nodeId.equals(req.req.initiatingNodeId())) {
+                    if (res == null)
+                        res = new ArrayList<>(cachesToClose.size());
+
+                    res.add(req.req);
+                }
+            }
+        }
+
+        return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+    }
+
+    /**
+     * @return New caches start requests.
+     */
+    Collection<ActionData> cacheStartRequests() {
+        return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
+    }
+
+    /**
+     * @return Start cache requests.
+     */
+    Collection<ActionData> newAndClientCachesStartRequests() {
+        if (cachesToStart != null || clientCachesToStart != null) {
+            List<ActionData> res = new ArrayList<>();
+
+            if (cachesToStart != null)
+                res.addAll(cachesToStart.values());
+
+            if (clientCachesToStart != null)
+                res.addAll(clientCachesToStart.values());
+
+            return res;
+        }
+
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return Stop cache requests.
+     */
+    Collection<ActionData> cacheStopRequests() {
+        return cachesToStop != null ? cachesToStop.values() : Collections.<ActionData>emptyList();
+    }
+
+    /**
+     * @param ctx Context.
+     */
+    public void completeRequestFutures(GridCacheSharedContext ctx) {
+        completeRequestFutures(cachesToStart, ctx);
+        completeRequestFutures(cachesToStop, ctx);
+        completeRequestFutures(cachesToClose, ctx);
+        completeRequestFutures(clientCachesToStart, ctx);
+        completeRequestFutures(cachesToResetLostParts, ctx);
+    }
+
+    /**
+     * @param map Actions map.
+     * @param ctx Context.
+     */
+    private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
+        if (map != null) {
+            for (ActionData req : map.values())
+                ctx.cache().completeCacheStartFuture(req.req, null);
+        }
+    }
+
+    /**
+     * @return {@code True} if have cache stop requests.
+     */
+    public boolean hasStop() {
+        return !F.isEmpty(cachesToStop);
+    }
+
+    /**
+     * @return Caches to reset lost partitions for.
+     */
+    public Set<String> cachesToResetLostPartitions() {
+        Set<String> caches = null;
+        
+        if (cachesToResetLostParts != null)
+            caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+        return caches != null ? caches : Collections.<String>emptySet();
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return {@code True} if cache stop was requested.
+     */
+    public boolean cacheStopped(int cacheId) {
+        if (cachesToStop != null) {
+            for (ActionData cache : cachesToStop.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return {@code True} if cache start was requested.
+     */
+    public boolean cacheStarted(int cacheId) {
+        if (cachesToStart != null) {
+            for (ActionData cache : cachesToStart.values()) {
+                if (cache.desc.cacheId() == cacheId)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param nodeId Local node ID.
+     * @return {@code True} if client cache was started.
+     */
+    public boolean clientCacheStarted(UUID nodeId) {
+        if (clientCachesToStart != null) {
+            for (ActionData cache : clientCachesToStart.values()) {
+                if (nodeId.equals(cache.req.initiatingNodeId()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param state New cluster state.
+     */
+    void newClusterState(ClusterState state) {
+        assert state != null;
+
+        newState = state;
+    }
+
+    /**
+     * @return New cluster state if state change was requested.
+     */
+    @Nullable public ClusterState newClusterState() {
+        return newState;
+    }
+
+    /**
+     * @param map Actions map.
+     * @param req Request.
+     * @param desc Cache descriptor.
+     * @return Actions map.
+     */
+    private Map<String, ActionData> add(Map<String, ActionData> map,
+        DynamicCacheChangeRequest req,
+        DynamicCacheDescriptor desc) {
+        assert req != null;
+        assert desc != null;
+
+        if (map == null)
+            map = new HashMap<>();
+
+        ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+        assert old == null : old;
+
+        return map;
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.start() : req;
+
+        cachesToStart = add(cachesToStart, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.start() : req;
+
+        clientCachesToStart = add(clientCachesToStart, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.stop() : req;
+
+        cachesToStop = add(cachesToStop, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.close() : req;
+
+        cachesToClose = add(cachesToClose, req, desc);
+    }
+
+    /**
+     * @param req Request.
+     * @param desc Cache descriptor.
+     */
+    void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+        assert req.resetLostPartitions() : req;
+
+        cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+    }
+
+    /**
+     * @return {@code True} if there are no cache change actions.
+     */
+    public boolean empty() {
+        return F.isEmpty(cachesToStart) &&
+            F.isEmpty(clientCachesToStart) &&
+            F.isEmpty(cachesToStop) &&
+            F.isEmpty(cachesToClose) &&
+            F.isEmpty(cachesToResetLostParts);
+    }
+
+    /**
+     *
+     */
+    static class ActionData {
+        /** */
+        private DynamicCacheChangeRequest req;
+
+        /** */
+        private DynamicCacheDescriptor desc;
+
+        /**
+         * @param req Request.
+         * @param desc Cache descriptor.
+         */
+        ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+            assert req != null;
+            assert desc != null;
+
+            this.req = req;
+            this.desc = desc;
+        }
+
+        /**
+         * @return Request.
+         */
+        public DynamicCacheChangeRequest request() {
+            return req;
+        }
+
+        /**
+         * @return Cache descriptor.
+         */
+        public DynamicCacheDescriptor descriptor() {
+            return desc;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a0489fc..aa503b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -234,8 +234,11 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private CountDownLatch startLatch = new CountDownLatch(1);
 
-    /** Start topology version. */
-    private AffinityTopologyVersion startTopVer;
+    /** Topology version when cache was started on local node. */
+    private AffinityTopologyVersion locStartTopVer;
+
+    /** */
+    private UUID rcvdFrom;
 
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
@@ -289,6 +292,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheSharedContext sharedCtx,
         CacheConfiguration cacheCfg,
         CacheType cacheType,
+        AffinityTopologyVersion locStartTopVer,
+        UUID rcvdFrom,
         boolean affNode,
         boolean updatesAllowed,
         MemoryPolicy memPlc,
@@ -316,6 +321,7 @@ public class GridCacheContext<K, V> implements Externalizable {
         assert ctx != null;
         assert sharedCtx != null;
         assert cacheCfg != null;
+        assert locStartTopVer != null : cacheCfg.getName();
 
         assert evtMgr != null;
         assert storeMgr != null;
@@ -333,6 +339,8 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.sharedCtx = sharedCtx;
         this.cacheCfg = cacheCfg;
         this.cacheType = cacheType;
+        this.locStartTopVer = locStartTopVer;
+        this.rcvdFrom = rcvdFrom;
         this.affNode = affNode;
         this.updatesAllowed = updatesAllowed;
         this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -452,17 +460,19 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
-     * @return Start topology version.
+     * @return Node ID cache was received from.
      */
-    public AffinityTopologyVersion startTopologyVersion() {
-        return startTopVer;
+    public UUID receivedFrom() {
+        return rcvdFrom;
     }
 
     /**
-     * @param startTopVer Start topology version.
+     * @return Topology version when cache was started on local node.
      */
-    public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
-        this.startTopVer = startTopVer;
+    public AffinityTopologyVersion startTopologyVersion() {
+        assert locStartTopVer != null : name();
+
+        return locStartTopVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (customMsg instanceof DynamicCacheChangeBatch) {
                         DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
-                        Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
-                        // Validate requests to check if event should trigger partition exchange.
-                        for (final DynamicCacheChangeRequest req : batch.requests()) {
-                            if (req.exchangeNeeded())
-                                valid.add(req);
-                            else {
-                                IgniteInternalFuture<?> fut = null;
-
-                                if (req.cacheFutureTopologyVersion() != null)
-                                    fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
-                                if (fut == null || fut.isDone())
-                                    cctx.cache().completeStartFuture(req);
-                                else {
-                                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                        @Override public void apply(IgniteInternalFuture<?> fut) {
-                                            cctx.cache().completeStartFuture(req);
-                                        }
-                                    });
-                                }
-                            }
-                        }
+                        ExchangeActions exchActions = batch.exchangeActions();
 
-                        if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
+                        if (exchActions != null) {
                             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
-                            exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+                            exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
                         }
                     }
                     else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -385,10 +363,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         assert startTime > 0;
 
         // Generate dummy discovery event for local node joining.
-        T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+        T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
 
-        DiscoveryEvent discoEvt = localJoin.get1();
-        DiscoCache discoCache = localJoin.get2();
+        DiscoveryEvent discoEvt = locJoin.get1();
+        DiscoCache discoCache = locJoin.get2();
 
         GridDhtPartitionExchangeId exchId = initialExchangeId();
 
@@ -488,8 +466,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
 
+            AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
+
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion() == null)
+                if (nodeStartVer.equals(cacheCtx.startTopologyVersion()))
                     cacheCtx.preloader().onInitialExchangeComplete(null);
             }
 
@@ -917,7 +897,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (exchId != null) {
                         AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
 
-                        ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+                        ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
                     }
                     else
                         ready = cacheCtx.started();
@@ -1123,25 +1103,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
      * @param cache Discovery data cache.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change actions.
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
     private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable DiscoCache cache,
-        @Nullable Collection<DynamicCacheChangeRequest> reqs,
+        @Nullable ExchangeActions exchActions,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {
         GridDhtPartitionsExchangeFuture fut;
 
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions, affChangeMsg));
 
         if (old != null) {
             fut = old;
 
-            if (reqs != null)
-                fut.cacheChangeRequests(reqs);
+            if (exchActions != null)
+                fut.exchangeActions(exchActions);
 
             if (affChangeMsg != null)
                 fut.affinityChangeMessage(affChangeMsg);
@@ -1320,9 +1300,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 
-                    if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
-                        entry.getValue() != null &&
-                        entry.getValue().topologyVersion() != null && // Backward compatibility.
+                    if (cacheCtx != null &&
                         cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                         continue;
 


[2/6] ignite git commit: Moved logic related to caches discovery data handling to ClusterCachesInfo. Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private long futId;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     }
 
     /**
+     * @param futId Future ID.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId,
+    public GridDhtAffinityAssignmentResponse(
+        long futId,
+        int cacheId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
+        this.futId = futId;
         this.cacheId = cacheId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);
     }
 
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean partitionExchangeMessage() {
         return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
+                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
-                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 5:
+                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -32,12 +33,11 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
  * Future that fetches affinity assignment from remote cache nodes.
  */
 public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -58,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private static IgniteLogger log;
 
     /** */
+    private static final AtomicLong idGen = new AtomicLong();
+
+    /** */
     private final GridCacheSharedContext ctx;
 
     /** List of available nodes this future can fetch data from. */
@@ -68,26 +68,33 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private ClusterNode pendingNode;
 
     /** */
-    @GridToStringInclude
-    private final T2<Integer, AffinityTopologyVersion> key;
+    private final long id;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final int cacheId;
 
     /**
      * @param ctx Context.
-     * @param cacheName Cache name.
+     * @param cacheDesc Cache descriptor.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        String cacheName,
+        DynamicCacheDescriptor cacheDesc,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
+        this.topVer = topVer;
+        this.cacheId = cacheDesc.cacheId();
         this.ctx = ctx;
-        int cacheId = CU.cacheId(cacheName);
-        this.key = new T2<>(cacheId, topVer);
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
+        id = idGen.getAndIncrement();
+
+        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * Initializes fetch future.
+     * @return Cache ID.
      */
-    public void init() {
-        ctx.affinity().addDhtAssignmentFetchFuture(this);
+    public int cacheId() {
+        return cacheId;
+    }
 
-        requestFromNextNode();
+    /**
+     * @return Future ID.
+     */
+    public long id() {
+        return id;
     }
 
     /**
-     * @return Future key.
+     * Initializes fetch future.
      */
-    public T2<Integer, AffinityTopologyVersion> key() {
-        return key;
+    public void init() {
+        ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+        requestFromNextNode();
     }
 
     /**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
      * @param res Response.
      */
     public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
-        if (!res.topologyVersion().equals(key.get2())) {
-            if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
-                    "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
-            return;
-        }
-
         GridDhtAffinityAssignmentResponse res0 = null;
 
         synchronized (this) {
@@ -188,7 +194,8 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                         log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() +
                             ", node=" + node + ']');
 
-                    ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
+                    ctx.io().send(node,
+                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4e699b3..8e79eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         int num = cctx.affinity().partitions();
 
         if (cctx.rebalanceEnabled()) {
-            boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+            boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
 
             boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
@@ -541,7 +541,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 cntrMap.clear();
 
                 // If this is the oldest node.
-                if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+                if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
                     if (node2part == null) {
                         node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -1156,10 +1156,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // If for some nodes current partition has a newer map,
                     // then we keep the newer value.
                     if (newPart != null &&
-                        (newPart.updateSequence() < part.updateSequence() || (
-                            cctx.startTopologyVersion() != null &&
-                                newPart.topologyVersion() != null && // Backward compatibility.
-                                cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+                        (newPart.updateSequence() < part.updateSequence() ||
+                        (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1169,7 +1167,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
 
-                //remove entry if node left
+                // Remove entry if node left.
                 for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
                     UUID nodeId = it.next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 6fb7df6..579796d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -122,6 +122,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
         boolean keepBinary,
         boolean skipStore
     ) {
+        assert topVer.topologyVersion() > 0 : topVer;
+
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futId = futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 654d306..b4cb3c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +110,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
 
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Dummy flag. */
     private final boolean dummy;
 
@@ -190,8 +187,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Logger. */
     private final IgniteLogger log;
 
-    /** Dynamic cache change requests. */
-    private Collection<DynamicCacheChangeRequest> reqs;
+    /** Cache change requests. */
+    private ExchangeActions exchActions;
 
     /** */
     private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +281,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
-     * @param reqs Cache change requests.
+     * @param exchActions Cache change requests.
      * @param affChangeMsg Affinity change message.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
         ReadWriteLock busyLock,
         GridDhtPartitionExchangeId exchId,
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         CacheAffinityChangeMessage affChangeMsg
     ) {
         assert busyLock != null;
         assert exchId != null;
         assert exchId.topologyVersion() != null;
+        assert exchActions == null || !exchActions.empty();
 
         dummy = false;
         forcePreload = false;
@@ -305,7 +303,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.cctx = cctx;
         this.busyLock = busyLock;
         this.exchId = exchId;
-        this.reqs = reqs;
+        this.exchActions = exchActions;
         this.affChangeMsg = affChangeMsg;
 
         log = cctx.logger(getClass());
@@ -317,10 +315,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param reqs Cache change requests.
+     * @param exchActions Exchange actions.
      */
-    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
-        this.reqs = reqs;
+    public void exchangeActions(ExchangeActions exchActions) {
+        assert exchActions == null || !exchActions.empty() : exchActions;
+        assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+        this.exchActions = exchActions;
     }
 
     /**
@@ -379,33 +380,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param cacheId Cache ID to check.
-     * @param topVer Topology version.
+     * @param rcvdFrom Topology version.
      * @return {@code True} if cache was added during this exchange.
      */
-    public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
-        if (cacheStarted(cacheId))
-            return true;
-
-        GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-        return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+    public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
+        return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
     }
 
     /**
      * @param cacheId Cache ID.
      * @return {@code True} if non-client cache was added during this exchange.
      */
-    public boolean cacheStarted(int cacheId) {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && !req.clientStartOnly()) {
-                    if (CU.cacheId(req.cacheName()) == cacheId)
-                        return true;
-                }
-            }
-        }
-
-        return false;
+    public boolean dynamicCacheStarted(int cacheId) {
+        return exchActions != null && exchActions.cacheStarted(cacheId);
     }
 
     /**
@@ -435,14 +422,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      *
      */
     public ClusterState newClusterState() {
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.globalStateChange())
-                    return req.state();
-            }
-        }
-
-        return null;
+        return exchActions != null ? exchActions.newClusterState() : null;
     }
 
     /**
@@ -524,7 +504,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
 
                 if (msg instanceof DynamicCacheChangeBatch){
-                    assert !F.isEmpty(reqs);
+                    assert exchActions != null && !exchActions.empty();
 
                     exchange = onCacheChangeRequest(crdNode);
                 }
@@ -540,10 +520,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             else {
                 if (discoEvt.type() == EVT_NODE_JOINED) {
-                    Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
+                    if (!discoEvt.eventNode().isLocal()) {
+                        Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+                            discoEvt.eventNode().id(),
+                            topVer);
 
-                    if (!discoEvt.eventNode().isLocal())
                         cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+                    }
+                    else
+                        cctx.cache().startCachesOnLocalJoin(topVer);
                 }
 
                 exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -553,20 +538,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             updateTopologies(crdNode);
 
-            if (!F.isEmpty(reqs)) {
-                boolean hasStop = false;
-
-                for (DynamicCacheChangeRequest req : reqs) {
-                    if (req.stop()) {
-                        hasStop = true;
-
-                        break;
-                    }
-                }
-
-                if (hasStop)
-                    cctx.cache().context().database().beforeCachesStop();
-            }
+            if (exchActions != null && exchActions.hasStop())
+                cctx.cache().context().database().beforeCachesStop();
 
             switch (exchange) {
                 case ALL: {
@@ -654,8 +627,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx.topology();
 
             if (crd) {
-                boolean updateTop = !cacheCtx.isLocal() &&
-                    exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
                     top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
@@ -674,32 +646,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
-        assert !F.isEmpty(reqs) : this;
+        assert exchActions != null && !exchActions.empty() : this;
 
         GridClusterStateProcessor stateProc = cctx.kernalContext().state();
 
-        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+        if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
             changeGlobalStateE = stateProc.onChangeGlobalState();
 
             if (crd && changeGlobalStateE != null)
                 changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
         }
 
-        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
-
-        if (clientOnly) {
-            boolean clientCacheStarted = false;
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
-                    clientCacheStarted = true;
+        boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
 
-                    break;
-                }
-            }
-
-            return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
-        }
+        if (clientOnly)
+            return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
         else
             return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
     }
@@ -768,7 +729,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private void clientOnlyExchange() throws IgniteCheckedException {
         clientOnlyExchange = true;
 
-        //todo checl invoke on client
         if (crd != null) {
             if (crd.isLocal()) {
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1046,19 +1006,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @return {@code True} if cache is stopping by this exchange.
      */
     public boolean stopping(int cacheId) {
-        boolean stopping = false;
-
-        if (!F.isEmpty(reqs)) {
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (cacheId == CU.cacheId(req.cacheName())) {
-                    stopping = req.stop();
-
-                    break;
-                }
-            }
-        }
-
-        return stopping;
+        return exchActions != null && exchActions.cacheStopped(cacheId);
     }
 
     /**
@@ -1069,13 +1017,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert node != null;
 
         // Reset lost partition before send local partition to coordinator.
-        if (!F.isEmpty(reqs)) {
-            Set<String> caches = new HashSet<>();
-
-            for (DynamicCacheChangeRequest req : reqs) {
-                if (req.resetLostPartitions())
-                    caches.add(req.cacheName());
-            }
+        if (exchActions != null) {
+            Set<String> caches = exchActions.cachesToResetLostPartitions();
 
             if (!F.isEmpty(caches))
                 resetLostPartitions(caches);
@@ -1203,14 +1146,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             cacheValidRes = m;
         }
 
-        cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+        cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
 
         cctx.exchange().onExchangeDone(this, err);
 
-        if (!F.isEmpty(reqs) && err == null) {
-            for (DynamicCacheChangeRequest req : reqs)
-                cctx.cache().completeStartFuture(req);
-        }
+        if (exchActions != null && err == null)
+            exchActions.completeRequestFutures(cctx);
 
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
@@ -1227,7 +1168,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
             }
 
-            reqs = null;
+            exchActions = null;
 
             if (discoEvt instanceof DiscoveryCustomEvent)
                 ((DiscoveryCustomEvent)discoEvt).customMessage(null);
@@ -1615,20 +1556,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 assert discoEvt instanceof DiscoveryCustomEvent;
 
                 if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
-                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
-                        .customMessage();
+                    if (exchActions != null) {
+                        if (exchActions.newClusterState() == ClusterState.ACTIVE)
+                            assignPartitionsStates();
 
-                    Set<String> caches = new HashSet<>();
+                        Set<String> caches = exchActions.cachesToResetLostPartitions();
 
-                    for (DynamicCacheChangeRequest req : batch.requests()) {
-                        if (req.resetLostPartitions())
-                            caches.add(req.cacheName());
-                        else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
-                            assignPartitionsStates();
+                        if (!F.isEmpty(caches))
+                            resetLostPartitions(caches);
                     }
-
-                    if (!F.isEmpty(caches))
-                        resetLostPartitions(caches);
                 }
             }
             else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
-                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+                    req.futureId(),
+                    cctx.cacheId(),
                     topVer,
                     assignment.assignment());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 5e3dc3f..8b538ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -146,35 +146,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
         GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
 
         try {
-            for (KeyCacheObject key : keys) {
-                while (true) {
-                    GridLocalCacheEntry entry = null;
-
-                    try {
-                        entry = entryExx(key);
-
-                        entry.unswap(false);
-
-                        if (!ctx.isAll(entry, filter)) {
-                            fut.onFailed();
-
-                            return fut;
-                        }
-
-                        // Removed exception may be thrown here.
-                        GridCacheMvccCandidate cand = fut.addEntry(entry);
-
-                        if (cand == null && fut.isDone())
-                            return fut;
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log().isDebugEnabled())
-                            log().debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
-                    }
-                }
-            }
+            if (!fut.addEntries(keys))
+                return fut;
 
             if (!ctx.mvcc().addFuture(fut))
                 fut.onError(new IgniteCheckedException("Duplicate future ID (internal error): " + fut));

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 59d0adb..9641533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -144,12 +144,51 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
+    }
+
+    /**
+     * @param keys Keys.
+     * @return {@code False} in case of error.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
+        for (KeyCacheObject key : keys) {
+            while (true) {
+                GridLocalCacheEntry entry = null;
+
+                try {
+                    entry = cache.entryExx(key);
+
+                    entry.unswap(false);
+
+                    if (!cctx.isAll(entry, filter)) {
+                        onFailed();
+
+                        return false;
+                    }
+
+                    // Removed exception may be thrown here.
+                    GridCacheMvccCandidate cand = addEntry(entry);
+
+                    if (cand == null && isDone())
+                        return false;
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+                }
+            }
+        }
 
         if (timeout > 0) {
             timeoutObj = new LockTimeoutObject();
 
             cctx.time().addTimeoutObject(timeoutObj);
         }
+
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -216,7 +255,7 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
      * @return Lock candidate.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
+    private @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
         throws GridCacheEntryRemovedException {
         // Add local lock first, as it may throw GridCacheEntryRemovedException.
         GridCacheMvccCandidate c = entry.addLocal(

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0f7b0df..fcf534c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -272,8 +272,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         qryTopVer = cctx.startTopologyVersion();
 
-        if (qryTopVer == null)
-            qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
+        assert qryTopVer != null : cctx.name();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..e7706dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -561,16 +561,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
-     * Wait topology.
+     * @param ctx Context.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+    void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
         GridCacheContext<K, V> cctx = cacheContext(ctx);
 
         if (!cctx.isLocal()) {
-            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+            AffinityTopologyVersion topVer = initTopVer;
+
+            cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
 
             for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
-                getOrCreatePartitionRecovery(ctx, partId);
+                getOrCreatePartitionRecovery(ctx, partId, topVer);
         }
     }
 
@@ -736,7 +739,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
@@ -869,37 +872,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     /**
      * @param ctx Context.
      * @param partId Partition id.
+     * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+        int partId,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
 
-            AffinityTopologyVersion initTopVer0 = initTopVer;
+            Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
 
-            if (initTopVer0 != null) {
+            if (initUpdCntrsPerNode != null) {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 GridCacheAffinityManager aff = cctx.affinity();
 
-                if (initUpdCntrsPerNode != null) {
-                    for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
-                        Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+                for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+                    Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
 
-                        if (map != null) {
-                            partCntrs = map.get(partId);
+                    if (map != null) {
+                        partCntrs = map.get(partId);
 
-                            break;
-                        }
+                        break;
                     }
                 }
-                else if (initUpdCntrs != null)
-                    partCntrs = initUpdCntrs.get(partId);
             }
+            else if (initUpdCntrs != null)
+                partCntrs = initUpdCntrs.get(partId);
 
-            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8377754..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -340,8 +341,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 updateCntr,
                 topVer);
 
-            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+            IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
+
+            assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
+                ", locStart=" + cctx.startTopologyVersion() +
+                ", locNode=" + cctx.localNode() +
+                ", stopping=" + cctx.kernalContext().isStopping();
+
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0);
 
             lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..b25b229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -279,9 +280,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
                 List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
                 DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
-                    requestId, null, ctx.localNodeId());
-
-                changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+                    requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
 
                 reqs.add(changeGlobalStateReq);
 
@@ -329,26 +328,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param reqs Requests.
+     * @param exchActions Requests.
+     * @param topVer Exchange topology version.
      */
     public boolean changeGlobalState(
-        Collection<DynamicCacheChangeRequest> reqs,
+        ExchangeActions exchActions,
         AffinityTopologyVersion topVer
     ) {
-        assert !F.isEmpty(reqs);
+        assert exchActions != null;
         assert topVer != null;
 
-        for (DynamicCacheChangeRequest req : reqs)
-            if (req.globalStateChange()) {
-                ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
-                assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+        if (exchActions.newClusterState() != null) {
+            ChangeGlobalStateContext cgsCtx = lastCgsCtx;
 
-                cgsCtx.topologyVersion(topVer);
+            assert cgsCtx != null : exchActions;
 
-                return true;
-            }
+            cgsCtx.topologyVersion(topVer);
 
+            return true;
+        }
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 81f5c28..59c2656 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -427,7 +427,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
      */
     protected final void unregisterMBean() throws IgniteSpiException {
         // Unregister SPI MBean.
-        if (spiMBean != null) {
+        if (spiMBean != null && ignite != null) {
             MBeanServer jmx = ignite.configuration().getMBeanServer();
 
             assert jmx != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..803beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -188,14 +188,15 @@ public class DiscoveryDataBag {
     }
 
     /**
-     *
+     * @return ID of joining node.
      */
     public UUID joiningNodeId() {
         return joiningNodeId;
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return Discovery data for given component.
      */
     public GridDiscoveryData gridDiscoveryData(int cmpId) {
         if (gridData == null)
@@ -207,7 +208,8 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return Joining node discovery data.
      */
     public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
         if (newJoinerData == null)
@@ -219,7 +221,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addJoiningNodeData(Integer cmpId, Serializable data) {
@@ -227,7 +229,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addGridCommonData(Integer cmpId, Serializable data) {
@@ -235,7 +237,7 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
      * @param data Data.
      */
     public void addNodeSpecificData(Integer cmpId, Serializable data) {
@@ -246,7 +248,8 @@ public class DiscoveryDataBag {
     }
 
     /**
-     * @param cmpId component ID.
+     * @param cmpId Component ID.
+     * @return {@code True} if common data collected for given component.
      */
     public boolean commonDataCollectedFor(Integer cmpId) {
         assert cmnDataInitializedCmps != null;
@@ -295,5 +298,4 @@ public class DiscoveryDataBag {
     @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
         return nodeSpecificData.get(DEFAULT_KEY);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index cc581e1..2a55412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1981,7 +1981,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
      *
      */
     void printStopInfo() {
-        if (log.isDebugEnabled())
+        IgniteLogger log = this.log;
+
+        if (log != null && log.isDebugEnabled())
             log.debug(stopInfo());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/config/examples.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/examples.properties b/modules/core/src/test/config/examples.properties
index c584f72..ea0d8ed 100644
--- a/modules/core/src/test/config/examples.properties
+++ b/modules/core/src/test/config/examples.properties
@@ -22,3 +22,4 @@ ScalarCacheExample=examples/config/example-ignite.xml
 ScalarCacheQueryExample=examples/config/example-ignite.xml
 ScalarCountGraphTrianglesExample=examples/config/example-ignite.xml
 ScalarPopularNumbersRealTimeExample=examples/config/example-ignite.xml
+MemoryPolicyExample=examples/config/example-memory-policies.xml
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 2110c28..99e80ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -78,13 +78,13 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
 
                 Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
 
-                String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME);
+                String nodeAttributeVal = node.attribute(SPLIT_ATTRIBUTE_NAME);
 
-                if (FIRST_NODE_GROUP.equals(nodeAttributeValue)
+                if (FIRST_NODE_GROUP.equals(nodeAttributeVal)
                     && backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
                     return true;
 
-                return backupAssignedAttribute.get(nodeAttributeValue).equals(0);
+                return backupAssignedAttribute.get(nodeAttributeVal).equals(0);
             }
         };
 
@@ -107,10 +107,11 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
 
             String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
 
-            Integer count = backupAssignedAttribute.get(val);
+            Integer cnt = backupAssignedAttribute.get(val);
 
-            backupAssignedAttribute.put(val, count + 1);
+            backupAssignedAttribute.put(val, cnt + 1);
         }
+
         return backupAssignedAttribute;
     }
 
@@ -157,6 +158,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      */
     public void testPartitionDistribution() throws Exception {
         backups = 1;
+
         try {
             for (int i = 0; i < 3; i++) {
                 splitAttrVal = "A";
@@ -205,6 +207,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      */
     public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
         backups = 3;
+
         try {
             for (int i = 0; i < 2; i++) {
                 splitAttrVal = FIRST_NODE_GROUP;

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index 13fae24..b6114d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -35,12 +35,6 @@ import org.apache.log4j.WriterAppender;
 @SuppressWarnings({"ProhibitedExceptionDeclared"})
 @GridCommonTest(group = "Kernal")
 public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
-    /** */
-
-    public GridNodeMetricsLogSelfTest() {
-        super(false);
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -51,6 +45,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }
@@ -80,11 +75,11 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
 
         cache2.put(2, "two");
 
-        Thread.sleep(10000);
+        Thread.sleep(10_000);
 
         //Check that nodes are alie
-        assert cache1.get(1).equals("one");
-        assert cache2.get(2).equals("two");
+        assertEquals("one", cache1.get(1));
+        assertEquals("two", cache2.get(2));
 
         String fullLog = strWr.toString();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index 7dce36b..ae9986d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
 import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.integration.CacheLoaderException;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -169,18 +171,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
 
         cacheCfg.setRebalanceMode(SYNC);
 
-        if (igniteInstanceName.endsWith("1"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_1));
-        else if (igniteInstanceName.endsWith("2"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_2));
-        else if (igniteInstanceName.endsWith("3"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_3));
-        else if (igniteInstanceName.endsWith("4"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_4));
-        else if (igniteInstanceName.endsWith("5"))
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_5));
-        else
-            cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_6));
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
 
         cacheCfg.setWriteThrough(true);
         cacheCfg.setReadThrough(true);
@@ -840,4 +831,30 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
             map.clear();
         }
     }
+
+    /**
+     *
+     */
+    static class StoreFactory implements Factory<CacheStore> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite node;
+
+        @Override public CacheStore create() {
+            String igniteInstanceName = node.configuration().getIgniteInstanceName();
+
+            if (igniteInstanceName.endsWith("1"))
+                return LOCAL_STORE_1;
+            else if (igniteInstanceName.endsWith("2"))
+                return LOCAL_STORE_2;
+            else if (igniteInstanceName.endsWith("3"))
+                return LOCAL_STORE_3;
+            else if (igniteInstanceName.endsWith("4"))
+                return LOCAL_STORE_4;
+            else if (igniteInstanceName.endsWith("5"))
+                return LOCAL_STORE_5;
+            else
+                return LOCAL_STORE_6;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
index a4a831f..546ec06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -44,13 +44,13 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
 
     /** {@inheritDoc} */
     @Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
-        //GridCacheEvictionRequest unmarshalling failed test
-        readCnt.set(5); //2 for each put
+        //GridCacheEvictionRequest unmarshalling failed test.
+        readCnt.set(5); //2 for each put.
 
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
 
-        //Eviction request unmarshalling failed but ioManager does not hangs up.
+        // Eviction request unmarshalling failed but ioManager does not hangs up.
 
         // Wait for eviction complete.
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..eb8077f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "c1";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private CacheConfiguration ccfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        if (ccfg != null)
+            cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testStartAndNodeJoin() throws Exception {
+        Ignite node0 = startGrid(0);
+
+        checkCache(0, CACHE_NAME, false);
+
+        node0.createCache(cacheConfiguration(CACHE_NAME));
+
+        checkCache(0, CACHE_NAME, true);
+
+        startGrid(1);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+
+        client = true;
+
+        startGrid(2);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+
+        ignite(2).destroyCache(CACHE_NAME);
+
+        checkCache(0, CACHE_NAME, false);
+        checkCache(1, CACHE_NAME, false);
+        checkCache(2, CACHE_NAME, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode1() throws Exception {
+        checkStartFromJoiningNode(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartFromJoiningNode2() throws Exception {
+        checkStartFromJoiningNode(true);
+    }
+
+    /**
+     * @param joinClient {@code True} if client node joins.
+     * @throws Exception If failed.
+     */
+    private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+        startGrid(0);
+        startGrid(1);
+
+        client = true;
+
+        startGrid(2);
+
+        ccfg = cacheConfiguration(CACHE_NAME);
+        client = joinClient;
+
+        startGrid(3);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+
+        client = false;
+        ccfg = null;
+
+        startGrid(4);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+        checkCache(4, CACHE_NAME, true);
+
+        client = true;
+
+        startGrid(5);
+
+        checkCache(0, CACHE_NAME, true);
+        checkCache(1, CACHE_NAME, true);
+        checkCache(2, CACHE_NAME, false);
+        checkCache(3, CACHE_NAME, true);
+        checkCache(4, CACHE_NAME, true);
+        checkCache(5, CACHE_NAME, false);
+
+        ignite(5).destroyCache(CACHE_NAME);
+
+        for (int i = 0; i < 5; i++)
+            checkCache(i, CACHE_NAME, false);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        return new CacheConfiguration(cacheName);
+    }
+
+    /**
+     * @param idx Node index.
+     * @param cacheName Cache name.
+     * @param expCache {@code True} if cache should be created.
+     */
+    private void checkCache(int idx, final String cacheName, final boolean expCache) throws IgniteInterruptedCheckedException {
+        final IgniteKernal node = (IgniteKernal)ignite(idx);
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expCache == (node.context().cache().cache(cacheName) != null);
+            }
+        }, 1000));
+
+        assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
      * @param nearOnly Near only flag.
      * @throws Exception If failed.
      */
-    public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+    private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
         try {
             final AtomicInteger cnt = new AtomicInteger(nodeCount());
             final AtomicReference<Throwable> err = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index fd77309..057b0d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -130,7 +130,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
 
     /**
      * Tests topology split scenario.
-     * @throws Exception
+     *
+     * @throws Exception If failed.
      */
     public void testTopologyValidator() throws Exception {
         assertTrue(initLatch.await(10, TimeUnit.SECONDS));
@@ -242,12 +243,15 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** */
         @CacheNameResource
         private String cacheName;
 
+        /** */
         @IgniteInstanceResource
         private Ignite ignite;
 
+        /** */
         @LoggerResource
         private IgniteLogger log;
 
@@ -263,7 +267,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
             }).isEmpty())
                 return false;
 
-            IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class);
+            IgniteKernal kernal = (IgniteKernal)ignite;
 
             GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
new file mode 100644
index 0000000..a80830a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridAtomicInteger;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheDiscoveryDataConcurrentJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Iteration. */
+    private static final int ITERATIONS = 3;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private ThreadLocal<Integer> staticCaches = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            /** */
+            private boolean delay = true;
+
+            @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                if (getTestIgniteInstanceName(0).equals(ignite.name())) {
+                    if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+                        if (delay) {
+                            log.info("Delay join processing: " + msg0);
+
+                            delay = false;
+
+                            doSleep(5000);
+                        }
+                    }
+                }
+
+                super.startMessageProcess(msg);
+            }
+        };
+
+        testSpi.setIpFinder(ipFinder);
+        testSpi.setJoinTimeout(60_000);
+
+        cfg.setDiscoverySpi(testSpi);
+
+        cfg.setClientMode(client);
+
+        Integer caches = staticCaches.get();
+
+        if (caches != null) {
+            cfg.setCacheConfiguration(cacheConfigurations(caches).toArray(new CacheConfiguration[caches]));
+
+            staticCaches.remove();
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentJoin() throws Exception {
+        for (int iter = 0; iter < ITERATIONS; iter++) {
+            log.info("Iteration: " + iter);
+
+            final int NODES = 6;
+            final int MAX_CACHES = 10;
+
+            final GridAtomicInteger caches = new GridAtomicInteger();
+
+            startGrid(0);
+
+            final AtomicInteger idx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int c = ThreadLocalRandom.current().nextInt(MAX_CACHES) + 1;
+
+                    staticCaches.set(c);
+
+                    startGrid(idx.getAndIncrement());
+
+                    caches.setIfGreater(c);
+
+                    return null;
+                }
+            }, NODES - 1, "start-node");
+
+            assertTrue(caches.get() > 0);
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite node = ignite(i);
+
+                for (int c = 0; c < caches.get(); c++) {
+                    Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
+
+                    assertEquals(NODES, nodes.size());
+
+                    checkCache(node, "cache-" + c);
+                }
+            }
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param caches Number of caches.
+     * @return Cache configurations.
+     */
+    private Collection<CacheConfiguration> cacheConfigurations(int caches) {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        for (int i = 0; i < caches; i++)
+            ccfgs.add(cacheConfiguration("cache-" + i));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+        ccfg.setName(cacheName);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        return ccfg;
+    }
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     */
+    private void checkCache(Ignite node, final String cacheName) {
+        assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index fed388a..bc435e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2165,7 +2165,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
         Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
 
         for (Ignite node : nodes) {
-            log.info("Check node: " + node.name());
+            log.info("Check affinity [node=" + node.name() + ", topVer=" + topVer + ", expIdeal=" + expIdeal + ']');
 
             IgniteKernal node0 = (IgniteKernal)node;
 
@@ -2175,7 +2175,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
                 fut.get();
 
             for (GridCacheContext cctx : node0.context().cache().context().cacheContexts()) {
-                if (cctx.startTopologyVersion() != null && cctx.startTopologyVersion().compareTo(topVer) > 0)
+                if (cctx.startTopologyVersion().compareTo(topVer) > 0)
                     continue;
 
                 List<List<ClusterNode>> aff1 = aff.get(cctx.name());

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 321faf8..88df607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -210,6 +211,8 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
                 Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
 
                 assertEquals(NODES, nodes.size());
+
+                checkCache(node, "cache-" + c);
             }
 
             for (int c = 0; c < 5; c++) {
@@ -247,4 +250,11 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
 
         return ccfg;
     }
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     */
+    private void checkCache(Ignite node, final String cacheName) {
+        assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 4864a67..bf5ba61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -26,6 +26,7 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSession;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -357,9 +359,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
      *
      */
     private static class FirstStoreFactory implements Factory<CacheStore> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
         /** {@inheritDoc} */
         @Override public CacheStore create() {
-            String igniteInstanceName = startingIgniteInstanceName.get();
+            String igniteInstanceName = ignite.name();
 
             CacheStore store = firstStores.get(igniteInstanceName);
 
@@ -374,9 +379,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
      *
      */
     private static class SecondStoreFactory implements Factory<CacheStore> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
         /** {@inheritDoc} */
         @Override public CacheStore create() {
-            String igniteInstanceName = startingIgniteInstanceName.get();
+            String igniteInstanceName = ignite.name();
 
             CacheStore store = secondStores.get(igniteInstanceName);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
index 372da32..a7128e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
@@ -22,7 +22,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -102,11 +104,7 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
         cc.setAtomicityMode(TRANSACTIONAL);
         cc.setBackups(1);
 
-        GridCacheTestStore store = new GridCacheTestStore();
-
-        stores.add(store);
-
-        cc.setCacheStoreFactory(singletonFactory(store));
+        cc.setCacheStoreFactory(new StoreFactory());
         cc.setReadThrough(true);
         cc.setWriteThrough(true);
         cc.setLoadPreviousValue(true);
@@ -269,4 +267,18 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
         assertEquals(expPutAll, putAll);
         assertEquals(expTxs, txs);
     }
+
+    /**
+     *
+     */
+    static class StoreFactory implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            GridCacheTestStore store = new GridCacheTestStore();
+
+            stores.add(store);
+
+            return store;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0f4aa87..2096179 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.loadtests.hashmap;
 
 import java.util.IdentityHashMap;
+import java.util.UUID;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -78,6 +80,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             ),
             defaultCacheConfiguration(),
             CacheType.USER,
+            AffinityTopologyVersion.ZERO,
+            UUID.randomUUID(),
             true,
             true,
             null,