You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/26 16:38:58 UTC

[2/2] ignite git commit: cache discovery data refactoring

cache discovery data refactoring


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c73e1c90
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c73e1c90
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c73e1c90

Branch: refs/heads/ignite-5075-cacheStart
Commit: c73e1c90ba7fea972e95728b94078a69c35bafbd
Parents: 4be320a
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 26 14:01:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 26 19:38:44 2017 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/CacheData.java    | 152 ++++
 .../cache/CacheJoinNodeDiscoveryData.java       |  91 ++
 .../cache/CacheNodeCommonDiscoveryData.java     |  56 ++
 .../CacheReconnectClientDiscoveryData.java      |  26 +
 .../processors/cache/ClusterCachesInfo.java     | 620 ++++++++++++++
 .../cache/DynamicCacheDescriptor.java           |   3 -
 .../processors/cache/GridCacheContext.java      |   6 +
 .../processors/cache/GridCacheIoManager.java    |   9 +-
 .../processors/cache/GridCacheProcessor.java    | 834 +++++++------------
 .../GridDhtPartitionsExchangeFuture.java        |   9 +-
 10 files changed, 1270 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
new file mode 100644
index 0000000..4579c27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CacheData implements Serializable {
+    /** */
+    private final CacheConfiguration cacheCfg;
+
+    /** */
+    private final Integer cacheId;
+
+    /** */
+    private final CacheType cacheType;
+
+    /** */
+    private final AffinityTopologyVersion startTopVer;
+
+    /** */
+    private final IgniteUuid deploymentId;
+
+    /** */
+    private final QuerySchema schema;
+
+    /** */
+    private final UUID rcvdFrom;
+
+    /** */
+    private final boolean staticCfg;
+
+    /** */
+    private final boolean template;
+
+    CacheData(CacheConfiguration cacheCfg,
+        int cacheId,
+        CacheType cacheType,
+        AffinityTopologyVersion startTopVer,
+        IgniteUuid deploymentId,
+        QuerySchema schema,
+        UUID rcvdFrom,
+        boolean staticCfg,
+        boolean template) {
+        assert cacheCfg != null;
+        assert rcvdFrom != null;
+        assert startTopVer != null;
+        assert deploymentId != null;
+        assert template || cacheId != 0;
+
+        this.cacheCfg = cacheCfg;
+        this.cacheId = cacheId;
+        this.cacheType = cacheType;
+        this.startTopVer = startTopVer;
+        this.deploymentId = deploymentId;
+        this.schema = schema;
+        this.rcvdFrom = rcvdFrom;
+        this.staticCfg = staticCfg;
+        this.template = template;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public Integer cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * @return Start topology version.
+     */
+    public AffinityTopologyVersion startTopologyVersion() {
+        return startTopVer;
+    }
+
+    /**
+     * @return {@code True} if this is template configuration.
+     */
+    public boolean template() {
+        return template;
+    }
+
+    /**
+     * @return Cache type.
+     */
+    public CacheType cacheType() {
+        return cacheType;
+    }
+
+    /**
+     * @return Start ID.
+     */
+    public IgniteUuid deploymentId() {
+        return deploymentId;
+    }
+
+    /**
+     * @return {@code True} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration cacheConfiguration() {
+        return cacheCfg;
+    }
+
+    /**
+     * @return Schema.
+     */
+    public QuerySchema schema() {
+        return schema.copy();
+    }
+
+    /**
+     * @return ID of node provided cache configuration.
+     */
+    public UUID receivedFrom() {
+        return rcvdFrom;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..0624217
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+class CacheJoinNodeDiscoveryData implements Serializable {
+    /** */
+    private final Map<String, CacheInfo> caches;
+
+    /** */
+    private final Map<String, CacheInfo> templates;
+
+    /** */
+    private final IgniteUuid cacheDeploymentId;
+
+    /**
+     * @param cacheDeploymentId Deployment ID for started caches.
+     * @param caches Caches.
+     * @param templates Templates.
+     */
+    CacheJoinNodeDiscoveryData(
+        IgniteUuid cacheDeploymentId,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) {
+        this.cacheDeploymentId = cacheDeploymentId;
+        this.caches = caches;
+        this.templates = templates;
+    }
+
+    IgniteUuid cacheDeploymentId() {
+        return cacheDeploymentId;
+    }
+
+    Map<String, CacheInfo> templates() {
+        return templates;
+    }
+
+    Map<String, CacheInfo> caches() {
+        return caches;
+    }
+
+    /**
+     *
+     */
+    static class CacheInfo implements Serializable {
+        /** */
+        private final CacheConfiguration ccfg;
+
+        /** */
+        private final CacheType cacheType;
+
+        /** */
+        private final byte flags;
+
+        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) {
+            this.ccfg = ccfg;
+            this.cacheType = cacheType;
+            this.flags = flags;
+        }
+
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        CacheType cacheType() {
+            return cacheType;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
new file mode 100644
index 0000000..10df452
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *
+ */
+class CacheNodeCommonDiscoveryData implements Serializable {
+    /** */
+    private final Map<String, CacheData> caches;
+
+    /** */
+    private final Map<String, CacheData> templates;
+
+    /** */
+    private final Map<String, Map<UUID, Boolean>> clientNodesMap;
+
+    CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
+        Map<String, CacheData> templates,
+        Map<String, Map<UUID, Boolean>> clientNodesMap) {
+        this.caches = caches;
+        this.templates = templates;
+        this.clientNodesMap = clientNodesMap;
+    }
+
+    Map<String, CacheData> caches() {
+        return caches;
+    }
+
+    Map<String, CacheData> templates() {
+        return templates;
+    }
+
+    Map<String, Map<UUID, Boolean>> clientNodesMap() {
+        return clientNodesMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
new file mode 100644
index 0000000..10a8f7e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectClientDiscoveryData.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public class CacheReconnectClientDiscoveryData implements Serializable {
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
new file mode 100644
index 0000000..bd4ee1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+
+/**
+ *
+ */
+class ClusterCachesInfo {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** Dynamic caches. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+    /** Cache templates. */
+    private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+
+    /** */
+    private CacheJoinNodeDiscoveryData joinDiscoData;
+
+    /** */
+    private CacheNodeCommonDiscoveryData gridData;
+
+    /** */
+    private List<DynamicCacheDescriptor> locJoinStartCaches;
+
+    /**
+     * @param ctx Context.
+     */
+    ClusterCachesInfo(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+        this.joinDiscoData = joinDiscoData;
+    }
+
+    void onKernalStart() throws IgniteCheckedException {
+
+    }
+
+    /**
+     * @param batch Cache change request.
+     * @param topVer Topology version.
+     * @return {@code True} if minor topology version should be increased.
+     */
+    boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+        boolean incMinorTopVer = false;
+
+        List<DynamicCacheDescriptor> added = null;
+
+        for (DynamicCacheChangeRequest req : batch.requests()) {
+            if (req.template()) {
+                CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                assert ccfg != null : req;
+
+                DynamicCacheDescriptor desc = registeredTemplates().get(req.cacheName());
+
+                if (desc == null) {
+                    DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+                        ccfg,
+                        req.cacheType(),
+                        true,
+                        req.deploymentId(),
+                        req.schema());
+
+                    templateDesc.receivedFrom(req.initiatingNodeId());
+
+                    DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+
+                    assert old == null;
+
+                    if (added == null)
+                        added = new ArrayList<>();
+
+                    added.add(templateDesc);
+                }
+
+                ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+
+                continue;
+            }
+
+            DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
+
+            boolean needExchange = false;
+
+            if (req.start()) {
+                if (desc == null) {
+                    if (req.clientStartOnly()) {
+                        ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
+                            "client cache (a cache with the given name is not started): " + req.cacheName()));
+                    }
+                    else {
+                        CacheConfiguration ccfg = req.startCacheConfiguration();
+
+                        assert req.cacheType() != null : req;
+                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
+
+                        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+                            ccfg,
+                            req.cacheType(),
+                            false,
+                            req.deploymentId(),
+                            req.schema());
+
+                        startDesc.receivedFrom(req.initiatingNodeId());
+
+                        DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+                        assert old == null;
+
+                        ctx.discovery().setCacheFilter(
+                            ccfg.getName(),
+                            ccfg.getNodeFilter(),
+                            ccfg.getNearConfiguration() != null,
+                            ccfg.getCacheMode());
+
+                        ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+
+                        added.add(startDesc);
+
+                        needExchange = true;
+                    }
+                }
+                else {
+                    assert req.initiatingNodeId() != null : req;
+
+                    // Cache already exists, exchange is needed only if client cache should be created.
+                    ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+                    boolean clientReq = node != null &&
+                        !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+                    if (req.clientStartOnly()) {
+                        needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                            req.initiatingNodeId(),
+                            req.nearCacheConfiguration() != null);
+                    }
+                    else {
+                        if (req.failIfExists()) {
+                            ctx.cache().completeCacheStartFuture(req,
+                                new CacheExistsException("Failed to start cache " +
+                                    "(a cache with the same name is already started): " + req.cacheName()));
+                        }
+                        else {
+                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                                req.initiatingNodeId(),
+                                req.nearCacheConfiguration() != null);
+
+                            if (needExchange)
+                                req.clientStartOnly(true);
+                        }
+                    }
+
+                    if (needExchange) {
+                        if (newTopVer == null) {
+                            newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
+                                topVer.minorTopologyVersion() + 1);
+                        }
+
+                        desc.clientCacheStartVersion(newTopVer);
+                    }
+                }
+
+                if (!needExchange && desc != null) {
+                    if (desc.clientCacheStartVersion() != null)
+                        req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
+                    else
+                        req.cacheFutureTopologyVersion(desc.startTopologyVersion());
+                }
+            }
+            else if (req.globalStateChange() || req.resetLostPartitions())
+                needExchange = true;
+            else {
+                assert req.stop() ^ req.close() : req;
+
+                if (desc != null) {
+                    if (req.stop()) {
+                        DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName()));
+
+                        assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+
+                        ctx.discovery().removeCacheFilter(req.cacheName());
+
+                        needExchange = true;
+                    }
+                    else {
+                        assert req.close() : req;
+
+                        needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+                    }
+                }
+            }
+
+            req.exchangeNeeded(needExchange);
+
+            incMinorTopVer |= needExchange;
+        }
+
+        if (added != null) {
+            AffinityTopologyVersion startTopVer = incMinorTopVer ?
+                new AffinityTopologyVersion(topVer.topologyVersion(), topVer.minorTopologyVersion() + 1) : topVer;
+
+            for (DynamicCacheDescriptor desc : added)
+                desc.startTopologyVersion(startTopVer);
+        }
+
+        return incMinorTopVer;
+    }
+
+    CacheJoinNodeDiscoveryData joinDiscoveryData() {
+        if (cachesOnDisconnect != null) {
+//            Collection<DynamicCacheChangeRequest> reqs;
+//
+//            Map<String, Map<UUID, Boolean>> clientNodesMap;
+//
+//            reqs = new ArrayList<>(caches.size() + 1);
+//
+//            clientNodesMap = U.newHashMap(caches.size());
+//
+//            collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
+
+            // TODO
+            return null;
+        }
+        else {
+            assert ctx.config().isDaemon() || joinDiscoData != null;
+
+            return joinDiscoData;
+        }
+    }
+
+    /**
+     * @param reqs requests.
+     * @param clientNodesMap Client nodes map.
+     * @param nodeId Node id.
+     */
+    private void collectDataOnReconnectingNode(
+        Collection<GridCacheAdapter> caches,
+        Collection<DynamicCacheChangeRequest> reqs,
+        Map<String, Map<UUID, Boolean>> clientNodesMap,
+        UUID nodeId
+    ) {
+        for (GridCacheAdapter<?, ?> cache : caches) {
+            DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+
+            if (desc == null)
+                continue;
+
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
+
+            req.startCacheConfiguration(desc.cacheConfiguration());
+            req.cacheType(desc.cacheType());
+            req.deploymentId(desc.deploymentId());
+            req.receivedFrom(desc.receivedFrom());
+            req.schema(desc.schema());
+
+            reqs.add(req);
+
+            Boolean nearEnabled = cache.isNear();
+
+            Map<UUID, Boolean> map = U.newHashMap(1);
+
+            map.put(nodeId, nearEnabled);
+
+            clientNodesMap.put(cache.name(), map);
+        }
+    }
+
+    /**
+     * Called from exchange worker.
+     *
+     * @return Caches to be started when this node starts.
+     */
+    List<DynamicCacheDescriptor> cachesToStartOnLocalJoin() {
+        assert locJoinStartCaches != null;
+
+        List<DynamicCacheDescriptor> locJoinStartCaches = this.locJoinStartCaches;
+
+        this.locJoinStartCaches = null;
+
+        return locJoinStartCaches;
+    }
+
+    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+        assert joinedNodeId != null;
+
+        List<DynamicCacheDescriptor> started = null;
+
+        if (!ctx.clientNode() && !ctx.isDaemon()) {
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                if (desc.staticallyConfigured()) {
+                    assert desc.receivedFrom() != null : desc;
+
+                    IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter();
+
+                    if (joinedNodeId.equals(desc.receivedFrom()) &&
+                        CU.affinityNode(ctx.discovery().localNode(), filter)) {
+                        if (started == null)
+                            started = new ArrayList<>();
+
+                        started.add(desc);
+                    }
+                }
+            }
+        }
+
+        return started;
+    }
+
+    /**
+     * Discovery event callback, executed from discovery thread.
+     *
+     * @param type Event type.
+     * @param node Event node.
+     * @param topVer Topology version.
+     */
+    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+        if (type == EVT_NODE_JOINED) {
+            if (node.id().equals(ctx.discovery().localNode().id())) {
+                if (gridData == null) { // First node starts.
+                    assert registeredCaches.isEmpty();
+                    assert registeredTemplates.isEmpty();
+                    assert joinDiscoData != null;
+
+                    processJoiningNode(joinDiscoData, node.id());
+                }
+
+                assert locJoinStartCaches == null;
+
+                locJoinStartCaches = new ArrayList<>();
+
+                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                    CacheConfiguration cfg = desc.cacheConfiguration();
+
+                    boolean locCfg = joinDiscoData.caches().containsKey(cfg.getName());
+
+                    if (locCfg || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+                        locJoinStartCaches.add(desc);
+                }
+
+                joinDiscoData = null;
+            }
+
+            initStartVersionOnJoin(registeredCaches.values(), node, topVer);
+
+            initStartVersionOnJoin(registeredTemplates.values(), node, topVer);
+        }
+    }
+
+    private void initStartVersionOnJoin(Collection<DynamicCacheDescriptor> descs,
+        ClusterNode joinedNode,
+        AffinityTopologyVersion topVer) {
+        for (DynamicCacheDescriptor cacheDesc : descs) {
+            if (cacheDesc.staticallyConfigured() && joinedNode.id().equals(cacheDesc.receivedFrom()))
+                cacheDesc.startTopologyVersion(topVer);
+        }
+    }
+
+    CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+        Map<String, CacheData> caches = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                desc.cacheId(),
+                desc.cacheType(),
+                desc.startTopologyVersion(),
+                desc.deploymentId(),
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                false);
+
+            caches.put(desc.cacheConfiguration().getName(), cacheData);
+        }
+
+        Map<String, CacheData> templates = new HashMap<>();
+
+        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+            CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+                0,
+                desc.cacheType(),
+                desc.startTopologyVersion(),
+                null,
+                desc.schema(),
+                desc.receivedFrom(),
+                desc.staticallyConfigured(),
+                true);
+
+            templates.put(desc.cacheConfiguration().getName(), cacheData);
+        }
+
+        return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+    }
+
+    void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        assert joinDiscoData != null;
+        assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
+
+        CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+
+        for (CacheData cacheData : cachesData.templates().values()) {
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                true,
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            desc.startTopologyVersion(cacheData.startTopologyVersion());
+            desc.receivedFrom(cacheData.receivedFrom());
+            desc.staticallyConfigured(cacheData.staticallyConfigured());
+
+            DynamicCacheDescriptor old = registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+
+            assert old == null;
+        }
+
+        for (CacheData cacheData : cachesData.caches().values()) {
+            CacheConfiguration cfg = cacheData.cacheConfiguration();
+
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                ctx,
+                cacheData.cacheConfiguration(),
+                cacheData.cacheType(),
+                false,
+                cacheData.deploymentId(),
+                cacheData.schema());
+
+            desc.startTopologyVersion(cacheData.startTopologyVersion());
+            desc.receivedFrom(cacheData.receivedFrom());
+            desc.staticallyConfigured(cacheData.staticallyConfigured());
+
+            DynamicCacheDescriptor old = registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
+
+            assert old == null;
+
+            ctx.discovery().setCacheFilter(
+                cfg.getName(),
+                cfg.getNodeFilter(),
+                cfg.getNearConfiguration() != null,
+                cfg.getCacheMode());
+        }
+
+        if (!F.isEmpty(cachesData.clientNodesMap())) {
+            for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+                String cacheName = entry.getKey();
+
+                for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+                    ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+            }
+        }
+
+        gridData = cachesData;
+    }
+
+    void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Serializable joiningNodeData = data.joiningNodeData();
+
+            if (joiningNodeData instanceof CacheReconnectClientDiscoveryData) {
+                // TODO
+            }
+            else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+                processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+        }
+    }
+
+    private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+            CacheConfiguration cfg = cacheInfo.config();
+
+            if (!registeredTemplates.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    true,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                desc.staticallyConfigured(true);
+                desc.receivedFrom(nodeId);
+
+                DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+
+                assert old == null : old;
+            }
+        }
+
+        for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+            CacheConfiguration cfg = cacheInfo.config();
+
+            if (!registeredCaches.containsKey(cfg.getName())) {
+                DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+                    cfg,
+                    cacheInfo.cacheType(),
+                    false,
+                    joinData.cacheDeploymentId(),
+                    new QuerySchema(cfg.getQueryEntities()));
+
+                desc.staticallyConfigured(true);
+                desc.receivedFrom(nodeId);
+
+                DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+
+                assert old == null : old;
+
+                ctx.discovery().setCacheFilter(
+                    cfg.getName(),
+                    cfg.getNodeFilter(),
+                    cfg.getNearConfiguration() != null,
+                    cfg.getCacheMode());
+            }
+
+            ctx.discovery().addClientNode(cfg.getName(),
+                nodeId,
+                cfg.getNearConfiguration() != null);
+        }
+    }
+
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
+        return registeredCaches;
+    }
+
+    ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
+        return registeredTemplates;
+    }
+
+    /** */
+    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+    void onDisconnect() {
+        cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+        registeredCaches.clear();
+        registeredTemplates.clear();
+    }
+
+    Set<String> onReconnected() {
+        assert cachesOnDisconnect != null;
+
+        Set<String> stoppedCaches = new HashSet<>();
+
+        for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+            DynamicCacheDescriptor desc = e.getValue();
+
+            String name = e.getKey();
+
+            boolean stopped;
+
+            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+
+            if (!sysCache) {
+                DynamicCacheDescriptor newDesc = registeredCaches.get(name);
+
+                stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+            }
+            else
+                stopped = false;
+
+            if (stopped)
+                stoppedCaches.add(name);
+        }
+
+        cachesOnDisconnect = null;
+
+        return stoppedCaches;
+    }
+
+    void clearCaches() {
+        registeredCaches.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 92a7af3..a2e91e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -50,9 +50,6 @@ public class DynamicCacheDescriptor {
     /** Statically configured flag. */
     private boolean staticCfg;
 
-    /** Started flag. */
-    private boolean started;
-
     /** Cache type. */
     private CacheType cacheType;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 92c144c..67f25b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -237,6 +237,8 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Start topology version. */
     private AffinityTopologyVersion startTopVer;
 
+    private AffinityTopologyVersion cacheStartTopVer;
+
     /** Dynamic cache deployment ID. */
     private IgniteUuid dynamicDeploymentId;
 
@@ -458,6 +460,10 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.startTopVer = startTopVer;
     }
 
+    public void cacheStartTopologyVersion(AffinityTopologyVersion cacheStartTopVer) {
+        this.cacheStartTopVer = cacheStartTopVer;
+    }
+
     /**
      * @return Cache default {@link ExpiryPolicy}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index fdd29e4..b9c066b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -150,12 +150,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
 
-                    if (cacheDesc != null) {
-                        if (cacheDesc.startTopologyVersion() != null)
-                            startTopVer = cacheDesc.startTopologyVersion();
-                        else if (cacheDesc.receivedFromStartVersion() != null)
-                            startTopVer = cacheDesc.receivedFromStartVersion();
-                    }
+                    // TODO: should be specified on request since cache desc can be removed,
+                    if (cacheDesc != null)
+                        startTopVer = cacheDesc.startTopologyVersion();
 
                     // Need to wait for exchange to avoid race between cache start and affinity request.
                     fut = cctx.exchange().affinityReadyFuture(startTopVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c73e1c90/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4b79361..ecbf475 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -152,7 +152,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
 import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
 import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
@@ -191,11 +190,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Template configuration add futures. */
     private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
 
-    /** Dynamic caches. */
-    private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
-
-    /** Cache templates. */
-    private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+    /** */
+    private ClusterCachesInfo cachesInfo;
 
     /** */
     private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
@@ -207,9 +203,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
 
     /** */
-    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
-    /** */
     private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
 
     /** Internal cache names. */
@@ -389,16 +382,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cc Cache Configuration.
      * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
      */
-    private boolean storesLocallyOnClient(IgniteConfiguration c,
-                                          CacheConfiguration cc) {
+    private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
         if (c.isClientMode() && c.getMemoryConfiguration() == null) {
             if (cc.getCacheMode() == LOCAL)
                 return true;
 
-            if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
-                return true;
+            return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
 
-            return false;
         }
         else
             return false;
@@ -623,6 +613,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+        cachesInfo = new ClusterCachesInfo(ctx);
+
         DeploymentMode depMode = ctx.config().getDeploymentMode();
 
         if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
@@ -643,72 +635,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        //if inActivate on start then skip registrate caches
-        if (!activeOnStart)
-            return;
+        if (activeOnStart && !ctx.config().isDaemon()) {
+            Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
 
-        CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
+            Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
 
-        registerCacheFromConfig(cfgs);
+            registerCacheFromConfig(caches, templates);
 
-        registerCacheFromPersistentStore(cfgs);
+            registerCacheFromPersistentStore(caches, templates);
 
-        if (log.isDebugEnabled())
-            log.debug("Started cache processor.");
-    }
-
-    /**
-     * @param cfgs Cache configurations.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException {
-        for (int i = 0; i < cfgs.length; i++) {
-            if (ctx.config().isDaemon())
-                continue;
-
-            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
-
-            cfgs[i] = cfg; // Replace original configuration value.
-
-            registerCache(cfg);
+            cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(), caches, templates));
         }
-    }
-
-    /**
-     * @param cfgs Cache configurations.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException {
-        if (sharedCtx.pageStore() != null &&
-            sharedCtx.database().persistenceEnabled() &&
-            !ctx.config().isDaemon()) {
-
-            Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
-
-            for (CacheConfiguration cfg : cfgs)
-                savedCacheNames.remove(cfg.getName());
-
-            for (String name : internalCaches)
-                savedCacheNames.remove(name);
-
-            if (!F.isEmpty(savedCacheNames)) {
-                log.info("Registrate persistent caches: " + savedCacheNames);
-
-                for (String name : savedCacheNames) {
-                    CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
 
-                    if (cfg != null)
-                        registerCache(cfg);
-                }
-            }
-        }
+        if (log.isDebugEnabled())
+            log.debug("Started cache processor.");
     }
 
     /**
      * @param cfg Cache configuration.
+     * @param caches Caches map.
+     * @param templates Templates map.
      * @throws IgniteCheckedException If failed.
      */
-    private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+    private void registerCache(CacheConfiguration cfg,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
         cloneCheckSerializable(cfg);
 
         CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
@@ -716,20 +667,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         // Initialize defaults.
         initialize(cfg, cacheObjCtx);
 
-        String masked = maskNull(cfg.getName());
-
-        if (cacheDescriptor(cfg.getName()) != null) {
-            String cacheName = cfg.getName();
+        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 
-            if (cacheName != null)
+        if (!template) {
+            if (caches.containsKey(cfg.getName())) {
                 throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
-                    "assign unique name to each cache): " + U.maskName(cacheName));
-            else
-                throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
-                    "assign unique name to each cache).");
-        }
+                    "assign unique name to each cache): " + cfg.getName());
+            }
 
-        CacheType cacheType;
+            CacheType cacheType;
 
             if (CU.isUtilityCache(cfg.getName()))
                 cacheType = CacheType.UTILITY;
@@ -738,63 +684,163 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             else
                 cacheType = CacheType.USER;
 
-        if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
-            cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
-
-        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
-
-        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
-            cfg,
-            cacheType,
-            template,
-            IgniteUuid.randomUuid(),
-            new QuerySchema(cfg.getQueryEntities()));
-
-        desc.locallyConfigured(true);
-        desc.staticallyConfigured(true);
-        desc.receivedFrom(ctx.localNodeId());
-
-        if (!template) {
-            cacheDescriptor(cfg.getName(), desc);
-
-            ctx.discovery().setCacheFilter(
-                cfg.getName(),
-                cfg.getNodeFilter(),
-                cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
-                cfg.getCacheMode());
-
-            ctx.discovery().addClientNode(cfg.getName(),
-                ctx.localNodeId(),
-                cfg.getNearConfiguration() != null);
+            if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+                cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
 
             if (!cacheType.userCache())
                 stopSeq.addLast(cfg.getName());
             else
                 stopSeq.addFirst(cfg.getName());
+
+            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0));
         }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Use cache configuration as template: " + cfg);
+        else
+            templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0));
+    }
+
+    /**
+     * @param caches Caches map.
+     * @param templates Templates map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void registerCacheFromConfig(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+    ) throws IgniteCheckedException {
+        assert !ctx.config().isDaemon();
+
+        CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
+
+        for (int i = 0; i < cfgs.length; i++) {
+            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
 
-            registeredTemplates.put(masked, desc);
+            registerCache(cfg, caches, templates);
         }
+    }
 
-        if (cfg.getName() == null) { // Use cache configuration with null name as template.
-            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
-                cfg,
-                cacheType,
-                true,
-                IgniteUuid.randomUuid(),
-                new QuerySchema(cfg.getQueryEntities()));
+    /**
+     * @param caches Caches map.
+     * @param templates Templates map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void registerCacheFromPersistentStore(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+    ) throws IgniteCheckedException {
+        assert !ctx.config().isDaemon();
 
-            desc0.locallyConfigured(true);
-            desc0.staticallyConfigured(true);
+        if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
+            Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
 
-            registeredTemplates.put(masked, desc0);
+            savedCacheNames.removeAll(caches.keySet());
+
+            savedCacheNames.removeAll(internalCaches);
+
+            if (!F.isEmpty(savedCacheNames)) {
+                if (log.isInfoEnabled())
+                    log.info("Register persistent caches: " + savedCacheNames);
+
+                for (String name : savedCacheNames) {
+                    CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
+
+                    if (cfg != null)
+                        registerCache(cfg, caches, templates);
+                }
+            }
         }
     }
 
     /**
+     * @param cfg Cache configuration.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+//        cloneCheckSerializable(cfg);
+//
+//        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+//
+//        // Initialize defaults.
+//        initialize(cfg, cacheObjCtx);
+//
+//        String masked = maskNull(cfg.getName());
+//
+//        if (cacheDescriptor(cfg.getName()) != null) {
+//            String cacheName = cfg.getName();
+//
+//            if (cacheName != null)
+//                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+//                    "assign unique name to each cache): " + U.maskName(cacheName));
+//            else
+//                throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
+//                    "assign unique name to each cache).");
+//        }
+//
+//        CacheType cacheType;
+//
+//        if (CU.isUtilityCache(cfg.getName()))
+//            cacheType = CacheType.UTILITY;
+//        else if (internalCaches.contains(maskNull(cfg.getName())))
+//            cacheType = CacheType.INTERNAL;
+//        else
+//            cacheType = CacheType.USER;
+//
+//        if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+//            cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
+//
+//        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
+//
+//        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+//            cfg,
+//            cacheType,
+//            template,
+//            IgniteUuid.randomUuid(),
+//            new QuerySchema(cfg.getQueryEntities()));
+//
+//        desc.locallyConfigured(true);
+//        desc.staticallyConfigured(true);
+//        desc.receivedFrom(ctx.localNodeId());
+//
+//        if (!template) {
+//            cacheDescriptor(cfg.getName(), desc);
+//
+//            ctx.discovery().setCacheFilter(
+//                cfg.getName(),
+//                cfg.getNodeFilter(),
+//                cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
+//                cfg.getCacheMode());
+//
+//            ctx.discovery().addClientNode(cfg.getName(),
+//                ctx.localNodeId(),
+//                cfg.getNearConfiguration() != null);
+//
+//            if (!cacheType.userCache())
+//                stopSeq.addLast(cfg.getName());
+//            else
+//                stopSeq.addFirst(cfg.getName());
+//        }
+//        else {
+//            if (log.isDebugEnabled())
+//                log.debug("Use cache configuration as template: " + cfg);
+//
+//            registeredTemplates.put(masked, desc);
+//        }
+//
+//        if (cfg.getName() == null) { // Use cache configuration with null name as template.
+//            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
+//                cfg,
+//                cacheType,
+//                true,
+//                IgniteUuid.randomUuid(),
+//                new QuerySchema(cfg.getQueryEntities()));
+//
+//            desc0.locallyConfigured(true);
+//            desc0.staticallyConfigured(true);
+//
+//            registeredTemplates.put(masked, desc0);
+//        }
+    }
+
+    /**
      * Initialize internal cache names
      */
     private void initializeInternalCacheNames() {
@@ -864,54 +910,55 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ctx.query().onCacheKernalStart();
 
             // Start dynamic caches received from collect discovery data.
-            for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-                if (ctx.config().isDaemon())
-                    continue;
-
-                desc.clearRemoteConfigurations();
-
-                CacheConfiguration ccfg = desc.cacheConfiguration();
-
-                IgnitePredicate filter = ccfg.getNodeFilter();
-
-                boolean loc = desc.locallyConfigured();
-
-                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
-                    boolean started = desc.onStart();
-
-                    assert started : "Failed to change started flag for locally configured cache: " + desc;
-
-                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-
-                    CachePluginManager pluginMgr = desc.pluginManager();
-
-                    GridCacheContext ctx = createCache(
-                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-
-                    ctx.dynamicDeploymentId(desc.deploymentId());
-
-                    sharedCtx.addCacheContext(ctx);
-
-                    GridCacheAdapter cache = ctx.cache();
-
-                    String name = ccfg.getName();
-
-                    caches.put(maskNull(name), cache);
-
-                    startCache(cache, desc.schema());
-
-                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
-                }
-            }
+//            for (DynamicCacheDescriptor desc : cacheDescriptors()) {
+//                if (ctx.config().isDaemon())
+//                    continue;
+//
+//                desc.clearRemoteConfigurations();
+//
+//                CacheConfiguration ccfg = desc.cacheConfiguration();
+//
+//                IgnitePredicate filter = ccfg.getNodeFilter();
+//
+//                boolean loc = desc.locallyConfigured();
+//
+//                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
+//                    boolean started = desc.onStart();
+//
+//                    assert started : "Failed to change started flag for locally configured cache: " + desc;
+//
+//                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+//
+//                    CachePluginManager pluginMgr = desc.pluginManager();
+//
+//                    GridCacheContext ctx = createCache(
+//                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
+//
+//                    ctx.dynamicDeploymentId(desc.deploymentId());
+//
+//                    sharedCtx.addCacheContext(ctx);
+//
+//                    GridCacheAdapter cache = ctx.cache();
+//
+//                    String name = ccfg.getName();
+//
+//                    caches.put(maskNull(name), cache);
+//
+//                    startCache(cache, desc.schema());
+//
+//                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+//                }
+//            }
         }
         finally {
             cacheStartedLatch.countDown();
         }
 
         // Must call onKernalStart on shared managers after creation of fetched caches.
-        for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
+        for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
             if (sharedCtx.database() != mgr)
                 mgr.onKernalStart(false);
+        }
 
         // Escape if start active on start false
         if (!activeOnStart)
@@ -925,23 +972,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         ctx.service().onUtilityCacheStarted();
 
-        // Wait for caches in SYNC preload mode.
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            CacheConfiguration cfg = desc.cacheConfiguration();
+        AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(locNode.order(), 0);
 
-            IgnitePredicate filter = cfg.getNodeFilter();
-
-            if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
-                GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
+        for (GridCacheAdapter cache : caches.values()) {
+            CacheConfiguration cfg = cache.configuration();
 
-                if (cache != null) {
-                    if (cfg.getRebalanceMode() == SYNC) {
-                        CacheMode cacheMode = cfg.getCacheMode();
+            if (cache.context().affinityNode() &&
+                cfg.getRebalanceMode() == SYNC &&
+                startTopVer.equals(cache.context().startTopologyVersion())) {
+                CacheMode cacheMode = cfg.getCacheMode();
 
-                        if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
-                            cache.preloader().syncFuture().get();
-                    }
-                }
+                if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+                    cache.preloader().syncFuture().get();
             }
         }
 
@@ -1031,7 +1073,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 stopCache(cache, cancel, false);
         }
 
-        registeredCaches.clear();
+        cachesInfo.clearCaches();
     }
 
     /**
@@ -1102,8 +1144,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
-        cachesOnDisconnect = new HashMap<>(registeredCaches);
-
         IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
             ctx.cluster().clientReconnectFuture(),
             "Failed to execute dynamic cache change request, client node disconnected.");
@@ -1130,9 +1170,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         sharedCtx.onDisconnected(reconnectFut);
 
-        registeredCaches.clear();
-
-        registeredTemplates.clear();
+        cachesInfo.onDisconnect();
     }
 
     /** {@inheritDoc} */
@@ -1141,24 +1179,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture<?, ?> stopFut = null;
 
-        for (final GridCacheAdapter cache : caches.values()) {
-            String name = cache.name();
-
-            boolean stopped;
+        Set<String> stoppedCaches = cachesInfo.onReconnected();
 
-            boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
-
-            if (!sysCache) {
-                DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name));
-
-                assert oldDesc != null : "No descriptor for cache: " + name;
-
-                DynamicCacheDescriptor newDesc = cacheDescriptor(name);
-
-                stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId());
-            }
-            else
-                stopped = false;
+        for (final GridCacheAdapter cache : caches.values()) {
+            boolean stopped = stoppedCaches.contains(cache.name());
 
             if (stopped) {
                 cache.context().gate().reconnected(true);
@@ -1185,11 +1209,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 reconnected.add(cache);
 
-                if (!sysCache) {
+                if (cache.context().userCache()) {
                     // Re-create cache structures inside indexing in order to apply recent schema changes.
                     GridCacheContext cctx = cache.context();
 
-                    DynamicCacheDescriptor desc = cacheDescriptor(name);
+                    DynamicCacheDescriptor desc = cacheDescriptor(cctx.name());
 
                     assert desc != null;
 
@@ -1211,8 +1235,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheAdapter cache : reconnected)
             cache.context().gate().reconnected(false);
 
-        cachesOnDisconnect = null;
-
         if (stopFut != null)
             stopFut.markInitialized();
 
@@ -1735,17 +1757,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Collection of started cache names.
      */
     public Collection<String> cacheNames() {
-        return F.viewReadOnly(cacheDescriptors(),
-            new IgniteClosure<DynamicCacheDescriptor, String>() {
-                @Override public String apply(DynamicCacheDescriptor desc) {
-                    return desc.cacheConfiguration().getName();
-                }
-            },
-            new IgnitePredicate<DynamicCacheDescriptor>() {
-                @Override public boolean apply(DynamicCacheDescriptor desc) {
-                    return desc.started();
-                }
-            });
+        return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() {
+            @Override public String apply(DynamicCacheDescriptor desc) {
+                return desc.cacheConfiguration().getName();
+            }
+        });
     }
 
     /**
@@ -1768,7 +1784,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         if (start) {
-            for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+            for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
                 DynamicCacheDescriptor desc = e.getValue();
 
                 CacheConfiguration ccfg = desc.cacheConfiguration();
@@ -1828,9 +1844,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
 
-        if (desc != null)
-            desc.onStart();
-
         prepareCacheStart(
             req.startCacheConfiguration(),
             req.nearCacheConfiguration(),
@@ -1838,48 +1851,55 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             req.clientStartOnly(),
             req.initiatingNodeId(),
             req.deploymentId(),
+            desc.startTopologyVersion(),
             topVer,
             desc != null ? desc.schema() : null
         );
     }
 
+    public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
+        List<DynamicCacheDescriptor> caches = cachesInfo.cachesToStartOnLocalJoin();
+
+        for (DynamicCacheDescriptor desc : caches) {
+            prepareCacheStart(
+                desc.cacheConfiguration(),
+                null,
+                desc.cacheType(),
+                false,
+                null,
+                desc.deploymentId(),
+                desc.startTopologyVersion(),
+                exchTopVer,
+                desc.schema()
+            );
+        }
+    }
+
     /**
      * Starts statically configured caches received from remote nodes during exchange.
      *
-     * @param topVer Topology version.
+     * @param nodeId Joining node ID.
+     * @param exchTopVer Current exchange version.
      * @return Started caches descriptors.
      * @throws IgniteCheckedException If failed.
      */
-    public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer)
+    public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
-        List<DynamicCacheDescriptor> started = null;
+        List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
 
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) {
-                if (desc.receivedFrom() != null) {
-                    AffinityTopologyVersion startVer = desc.receivedFromStartVersion();
-
-                    if (startVer == null || startVer.compareTo(topVer) > 0)
-                        continue;
-                }
-
-                if (desc.onStart()) {
-                    if (started == null)
-                        started = new ArrayList<>();
-
-                    started.add(desc);
-
-                    prepareCacheStart(
-                        desc.cacheConfiguration(),
-                        null,
-                        desc.cacheType(),
-                        false,
-                        null,
-                        desc.deploymentId(),
-                        topVer,
-                        desc.schema()
-                    );
-                }
+        if (started != null) {
+            for (DynamicCacheDescriptor desc : started) {
+                prepareCacheStart(
+                    desc.cacheConfiguration(),
+                    null,
+                    desc.cacheType(),
+                    false,
+                    null,
+                    desc.deploymentId(),
+                    desc.startTopologyVersion(),
+                    exchTopVer,
+                    desc.schema()
+                );
             }
         }
 
@@ -1893,7 +1913,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param clientStartOnly Client only start request.
      * @param initiatingNodeId Initiating node ID.
      * @param deploymentId Deployment ID.
-     * @param topVer Topology version.
+     * @param cacheStartTopVer Cache start topology version.
+     * @param exchTopVer Current exchange version.
      * @param schema Query schema.
      * @throws IgniteCheckedException If failed.
      */
@@ -1904,7 +1925,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         boolean clientStartOnly,
         UUID initiatingNodeId,
         IgniteUuid deploymentId,
-        AffinityTopologyVersion topVer,
+        AffinityTopologyVersion cacheStartTopVer,
+        AffinityTopologyVersion exchTopVer,
         @Nullable QuerySchema schema
     ) throws IgniteCheckedException {
         CacheConfiguration ccfg = new CacheConfiguration(cfg);
@@ -1916,8 +1938,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
         boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
 
-        if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
-            return;
+        assert !caches.containsKey(ccfg.getName()) : ccfg.getName();
 
         if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
             if (clientNodeStart && !affNodeStart) {
@@ -1931,7 +1952,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
 
-            cacheCtx.startTopologyVersion(topVer);
+            cacheCtx.startTopologyVersion(exchTopVer);
+            cacheCtx.cacheStartTopologyVersion(cacheStartTopVer);
 
             cacheCtx.dynamicDeploymentId(deploymentId);
 
@@ -1950,7 +1972,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param req Stop request.
      */
-    public void blockGateway(DynamicCacheChangeRequest req) {
+    void blockGateway(DynamicCacheChangeRequest req) {
         assert req.stop() || req.close();
 
         if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
@@ -2127,263 +2149,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
-        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+        dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), cachesInfo.joinDiscoveryData());
     }
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
-    }
-
-    /**
-     * @param joiningNodeId Joining node id.
-     */
-    private Serializable getDiscoveryData(UUID joiningNodeId) {
-        boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
-
-        // Collect dynamically started caches to a single object.
-        Collection<DynamicCacheChangeRequest> reqs;
-
-        Map<String, Map<UUID, Boolean>> clientNodesMap;
-
-        if (reconnect) {
-            reqs = new ArrayList<>(caches.size() + 1);
-
-            clientNodesMap = U.newHashMap(caches.size());
-
-            collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
-        }
-        else {
-            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
-
-            clientNodesMap = ctx.discovery().clientNodesMap();
-
-            collectDataOnGridNode(reqs);
-        }
-
-        DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
-
-        batch.clientNodes(clientNodesMap);
-
-        batch.clientReconnect(reconnect);
-
-        // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
-        batch.id(null);
-
-        return batch;
-    }
-
-    /**
-     * @param reqs requests.
-     */
-    private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
-        for (DynamicCacheDescriptor desc : cacheDescriptors()) {
-            // RequestId must be null because on different node will be different byte [] and
-            // we get duplicate discovery data, for more details see
-            // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
-                null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.cacheType(desc.cacheType());
-            req.deploymentId(desc.deploymentId());
-            req.receivedFrom(desc.receivedFrom());
-            req.schema(desc.schema());
-
-            reqs.add(req);
-        }
-
-        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-            // RequestId must be null because on different node will be different byte [] and
-            // we get duplicate discovery data, for more details see
-            // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
-                null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.schema(desc.schema());
-
-            req.template(true);
-
-            reqs.add(req);
-        }
-    }
-
-    /**
-     * @param reqs requests.
-     * @param clientNodesMap Client nodes map.
-     * @param nodeId Node id.
-     */
-    private void collectDataOnReconnectingNode(
-            Collection<DynamicCacheChangeRequest> reqs,
-            Map<String, Map<UUID, Boolean>> clientNodesMap,
-            UUID nodeId
-    ) {
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
-
-            if (desc == null)
-                continue;
-
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
-
-            req.startCacheConfiguration(desc.cacheConfiguration());
-            req.cacheType(desc.cacheType());
-            req.deploymentId(desc.deploymentId());
-            req.receivedFrom(desc.receivedFrom());
-            req.schema(desc.schema());
-
-            reqs.add(req);
-
-            Boolean nearEnabled = cache.isNear();
-
-            Map<UUID, Boolean> map = U.newHashMap(1);
-
-            map.put(nodeId, nearEnabled);
-
-            clientNodesMap.put(cache.name(), map);
-        }
+        if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+            dataBag.addGridCommonData(CACHE_PROC.ordinal(), cachesInfo.collectCommonDiscoveryData());
     }
 
     /** {@inheritDoc} */
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
-        if (data.hasJoiningNodeData()) {
-            Serializable joiningNodeData = data.joiningNodeData();
-            if (joiningNodeData instanceof DynamicCacheChangeBatch)
-                onDiscoDataReceived(
-                        data.joiningNodeId(),
-                        data.joiningNodeId(),
-                        (DynamicCacheChangeBatch) joiningNodeData, true);
-        }
+        cachesInfo.onJoiningNodeDataReceived(data);
     }
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
-        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
-
-        if (nodeSpecData != null) {
-            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
-                if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
-                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
-
-                    onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param joiningNodeId Joining node id.
-     * @param rmtNodeId Rmt node id.
-     * @param batch Batch.
-     * @param join Whether this is data from joining node.
-     */
-    private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) {
-        if (batch.clientReconnect()) {
-            if (ctx.clientDisconnected()) {
-                if (clientReconnectReqs == null)
-                    clientReconnectReqs = new LinkedHashMap<>();
-
-                clientReconnectReqs.put(joiningNodeId, batch);
-
-                return;
-            }
-
-            processClientReconnectData(joiningNodeId, batch);
-        }
-        else {
-            for (DynamicCacheChangeRequest req : batch.requests()) {
-                initReceivedCacheConfiguration(req);
-
-                if (req.template()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                    assert ccfg != null : req;
-
-                    DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName()));
-
-                    if (existing == null) {
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                                ctx,
-                                ccfg,
-                                req.cacheType(),
-                                true,
-                                req.deploymentId(),
-                                req.schema());
-
-                        registeredTemplates.put(maskNull(req.cacheName()), desc);
-                    }
-
-                    continue;
-                }
-
-                DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName());
-
-                if (req.start() && !req.clientStartOnly()) {
-                    CacheConfiguration ccfg = req.startCacheConfiguration();
-
-                    if (existing != null) {
-                        if (joiningNodeId.equals(ctx.localNodeId())) {
-                            existing.receivedFrom(req.receivedFrom());
-                            existing.deploymentId(req.deploymentId());
-                        }
-
-                        if (existing.locallyConfigured()) {
-                            existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
-
-                            if (!join)
-                                // Overwrite existing with remote.
-                                existing.schema(req.schema());
-
-                            ctx.discovery().setCacheFilter(
-                                    req.cacheName(),
-                                    ccfg.getNodeFilter(),
-                                    ccfg.getNearConfiguration() != null,
-                                    ccfg.getCacheMode());
-                        }
-                    }
-                    else {
-                        assert req.cacheType() != null : req;
-
-                        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
-                                ctx,
-                                ccfg,
-                                req.cacheType(),
-                                false,
-                                req.deploymentId(),
-                                req.schema());
-
-                        // Received statically configured cache.
-                        if (req.initiatingNodeId() == null)
-                            desc.staticallyConfigured(true);
-
-                        if (joiningNodeId.equals(ctx.localNodeId()))
-                            desc.receivedOnDiscovery(true);
-
-                        desc.receivedFrom(req.receivedFrom());
-
-                        DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc);
-
-                        assert old == null : old;
-
-                        ctx.discovery().setCacheFilter(
-                                req.cacheName(),
-                                ccfg.getNodeFilter(),
-                                ccfg.getNearConfiguration() != null,
-                                ccfg.getCacheMode());
-                    }
-                }
-            }
-
-            if (!F.isEmpty(batch.clientNodes())) {
-                for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
-                    String cacheName = entry.getKey();
-
-                    for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
-                        ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
-                }
-            }
-        }
+        cachesInfo.onGridDataReceived(data);
     }
 
     /**
@@ -2469,7 +2251,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         List<CacheConfiguration> wildcardNameCfgs = null;
 
-        for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+        for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
             assert desc.template();
 
             CacheConfiguration cfg = desc.cacheConfiguration();
@@ -2744,7 +2526,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         checkEmptyTransactions();
 
         if (F.isEmpty(cacheNames))
-            cacheNames = registeredCaches.keySet();
+            cacheNames = cachesInfo.registeredCaches().keySet();
 
         Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
 
@@ -2965,12 +2747,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      */
     public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
-        if (type == EVT_NODE_JOINED) {
-            for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) {
-                if (node.id().equals(cacheDesc.receivedFrom()))
-                    cacheDesc.receivedFromStartVersion(topVer);
-            }
-        }
+        cachesInfo.onDiscoveryEvent(type, node, topVer);
 
         sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
     }
@@ -2997,11 +2774,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             return true;
 
         if (msg instanceof DynamicCacheChangeBatch)
-            return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
+            return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
 
         return false;
     }
 
+    void completeTemplateAddFuture(String name, IgniteUuid deploymentId) {
+        GridCacheProcessor.TemplateConfigurationFuture fut =
+            (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(name);
+
+        if (fut != null && fut.deploymentId().equals(deploymentId))
+            fut.onDone();
+    }
+
+    void completeCacheStartFuture(DynamicCacheChangeRequest req, Exception err) {
+        if (ctx.localNodeId().equals(req.initiatingNodeId())) {
+            DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
+
+            if (fut != null && F.eq(req.deploymentId(), fut.deploymentId()))
+                fut.onDone(err);
+        }
+    }
+
     /**
      * @param batch Change request batch.
      * @param topVer Current topology version.
@@ -3023,13 +2817,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 assert ccfg != null : req;
 
-                DynamicCacheDescriptor desc = registeredTemplates.get(maskNull(req.cacheName()));
+                DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(maskNull(req.cacheName()));
 
                 if (desc == null) {
                     DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true,
                         req.deploymentId(), req.schema());
 
-                    DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
+                    DynamicCacheDescriptor old = cachesInfo.registeredTemplates().put(maskNull(ccfg.getName()), templateDesc);
 
                     assert old == null :
                         "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']';
@@ -3080,7 +2874,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                         startDesc.startTopologyVersion(newTopVer);
 
-                        DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc);
+                        // TODO
+                        DynamicCacheDescriptor old = null;//cacheDescriptor(ccfg.getName(), startDesc);
 
                         assert old == null :
                             "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
@@ -3152,7 +2947,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (desc != null) {
                     if (req.stop()) {
-                        DynamicCacheDescriptor old = registeredCaches.remove(maskNull(req.cacheName()));
+                        DynamicCacheDescriptor old = cachesInfo.registeredCaches().remove(maskNull(req.cacheName()));
 
                         assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
@@ -3610,25 +3405,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Descriptor.
      */
     public DynamicCacheDescriptor cacheDescriptor(String name) {
-        return registeredCaches.get(maskNull(name));
-    }
-
-    /**
-     * Put registered cache descriptor.
-     *
-     * @param name Name.
-     * @param desc Descriptor.
-     * @return Old descriptor (if any).
-     */
-    private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) {
-        return registeredCaches.put(maskNull(name), desc);
+        return cachesInfo.registeredCaches().get(name);
     }
 
     /**
      * @return Cache descriptors.
      */
     public Collection<DynamicCacheDescriptor> cacheDescriptors() {
-        return registeredCaches.values();
+        return cachesInfo.registeredCaches().values();
     }
 
     /**
@@ -3655,7 +3439,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
         String masked = maskNull(cacheCfg.getName());
 
-        DynamicCacheDescriptor desc = registeredTemplates.get(masked);
+        DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(masked);
 
         if (desc != null)
             return;
@@ -3833,7 +3617,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void createMissingQueryCaches() throws IgniteCheckedException {
-        for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+        for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
             DynamicCacheDescriptor desc = e.getValue();
 
             if (isMissingQueryCache(desc))