You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/08/09 13:51:38 UTC

[ignite] branch master updated: IGNITE-9562 Destroyed cache that resurrected on an old offline node breaks PME - Fixes #6748.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e9f70  IGNITE-9562 Destroyed cache that resurrected on an old offline node breaks PME - Fixes #6748.
27e9f70 is described below

commit 27e9f705c1f65baae20b7dc3c03e988217dbe3f6
Author: Eduard Shangareev <ed...@gmail.com>
AuthorDate: Fri Aug 9 16:50:45 2019 +0300

    IGNITE-9562 Destroyed cache that resurrected on an old offline node breaks PME - Fixes #6748.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../cache/CacheJoinNodeDiscoveryData.java          |   7 +
 .../internal/processors/cache/CachesRegistry.java  |  33 +-
 .../processors/cache/ClusterCachesInfo.java        |  70 +-
 .../cache/DynamicCacheChangeRequest.java           |  15 +-
 .../processors/cache/GridCacheProcessor.java       | 989 +++------------------
 .../processors/cache/GridLocalConfigManager.java   | 333 +++++++
 .../cache/ValidationOnNodeJoinUtils.java           | 746 ++++++++++++++++
 .../distributed/CacheBaselineTopologyTest.java     |   6 +-
 .../persistence/IgnitePdsDestroyCacheTest.java     |   3 +
 .../GridMarshallerMappingConsistencyTest.java      |   2 +-
 .../db/IgniteCacheGroupsWithRestartsTest.java      | 118 ++-
 11 files changed, 1382 insertions(+), 940 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index 4ab3266..9611899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -163,6 +163,13 @@ public class CacheJoinNodeDiscoveryData implements Serializable {
         }
 
         /**
+         * @return Long which bits represent some flags.
+         */
+        public long getFlags() {
+            return flags;
+        }
+
+        /**
          * @param ois ObjectInputStream.
          */
         private void readObject(ObjectInputStream ois)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 3b906ca..6010cc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -31,6 +31,7 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -257,7 +258,11 @@ public class CachesRegistry {
         if (cachesToPersist.isEmpty())
             return cachesConfPersistFuture = new GridFinishedFuture<>();
 
-        return cachesConfPersistFuture = persistCacheConfigurations(cachesToPersist);
+        List<StoredCacheData> cacheConfigsToPersist = cacheDescriptors.stream()
+            .map(desc -> desc.toStoredData(cctx.cache().splitter()))
+            .collect(Collectors.toList());
+
+        return cachesConfPersistFuture = persistCacheConfigurations(cacheConfigsToPersist);
     }
 
     /**
@@ -273,16 +278,12 @@ public class CachesRegistry {
     }
 
     /**
-     * Persists cache configurations from given {@code cacheDescriptors}.
+     * Persists cache configurations.
      *
-     * @param cacheDescriptors Cache descriptors to retrieve cache configurations.
+     * @param cacheConfigsToPersist Cache configurations to persist.
      * @return Future that will be completed when all cache configurations will be persisted to cache work directory.
      */
-    private IgniteInternalFuture<?> persistCacheConfigurations(List<DynamicCacheDescriptor> cacheDescriptors) {
-        List<StoredCacheData> cacheConfigsToPersist = cacheDescriptors.stream()
-            .map(desc -> desc.toStoredData(cctx.cache().splitter()))
-            .collect(Collectors.toList());
-
+    private IgniteInternalFuture<?> persistCacheConfigurations(List<StoredCacheData> cacheConfigsToPersist) {
         // Pre-create cache work directories if they don't exist.
         for (StoredCacheData data : cacheConfigsToPersist) {
             try {
@@ -297,13 +298,15 @@ public class CachesRegistry {
             }
         }
 
-        return cctx.kernalContext().closure().runLocalSafe(() -> {
-            try {
-                for (StoredCacheData data : cacheConfigsToPersist)
-                    cctx.pageStore().storeCacheData(data, false);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Error while saving cache configurations on disk", e);
+        return cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+            @Override public void run() {
+                try {
+                    for (StoredCacheData data : cacheConfigsToPersist)
+                        cctx.cache().saveCacheConfiguration(data, false);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Error while saving cache configurations on disk", e);
+                }
             }
         });
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 3df638d..13ef63c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -33,6 +33,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheExistsException;
@@ -151,15 +152,13 @@ class ClusterCachesInfo {
      * Filters all dynamic cache descriptors and groups that were not presented on node start
      * and were received with grid discovery data.
      *
-     * @param localConfigData node's local cache configurations
-     * (both from static config and stored with persistent caches).
-     *
+     * @param localCachesOnStart Caches which were already presented on node start.
      */
-    public void filterDynamicCacheDescriptors(CacheJoinNodeDiscoveryData localConfigData) {
+    public void filterDynamicCacheDescriptors(Set<String> localCachesOnStart) {
         if (ctx.isDaemon())
             return;
 
-        filterRegisteredCachesAndCacheGroups(localConfigData.caches());
+        filterRegisteredCachesAndCacheGroups(localCachesOnStart);
 
         List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = locJoinCachesCtx.caches();
 
@@ -183,14 +182,14 @@ class ClusterCachesInfo {
      *
      * @param locCaches Caches from local node configuration (static configuration and persistent caches).
      */
-    private void filterRegisteredCachesAndCacheGroups(Map<String, CacheJoinNodeDiscoveryData.CacheInfo> locCaches) {
+    private void filterRegisteredCachesAndCacheGroups(Set<String> locCaches) {
         //filter registered caches
         Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator();
 
         while (cachesIter.hasNext()) {
             Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next();
 
-            if (!locCaches.containsKey(e.getKey())) {
+            if (!locCaches.contains(e.getKey())) {
                 cachesIter.remove();
 
                 ctx.discovery().removeCacheFilter(e.getKey());
@@ -1150,23 +1149,23 @@ class ClusterCachesInfo {
 
     /**
      * @param dataBag Discovery data bag.
+     * @param splitter Cache configuration splitter.
      */
-    public void collectGridNodeData(DiscoveryDataBag dataBag) {
+    public void collectGridNodeData(DiscoveryDataBag dataBag, CacheConfigurationSplitter splitter) {
         if (ctx.isDaemon())
             return;
 
         if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
-            dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+            dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData(splitter));
     }
 
     /**
      * @return Information about started caches.
+     * @param cfgSplitter Cache configuration splitter.
      */
-    private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+    private CacheNodeCommonDiscoveryData collectCommonDiscoveryData(CacheConfigurationSplitter cfgSplitter) {
         Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
 
-        CacheConfigurationSplitter cfgSplitter = ctx.cache().backwardCompatibleSplitter();
-
         for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
             T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(grpDesc);
 
@@ -1789,6 +1788,53 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param data Joining node data.
+     * @return Message with error or null if everything was OK.
+     */
+    public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+        if (data.hasJoiningNodeData()) {
+            Serializable joiningNodeData = data.joiningNodeData();
+
+            if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+                CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData;
+
+                Set<String> problemCaches = null;
+
+                for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+                    CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
+
+                    if (!registeredCaches.containsKey(cfg.getName())) {
+                        String conflictErr = checkCacheConflict(cfg);
+
+                        if (conflictErr != null) {
+                            U.warn(log, "Ignore cache received from joining node. " + conflictErr);
+
+                            continue;
+                        }
+
+                        long flags = cacheInfo.getFlags();
+
+                        if (flags == 1L) {
+                            if (problemCaches == null)
+                                problemCaches = new HashSet<>();
+
+                            problemCaches.add(cfg.getName());
+                        }
+                    }
+                }
+
+                if (!F.isEmpty(problemCaches))
+                    return problemCaches.stream().collect(Collectors.joining(", ",
+                        "Joining node has caches with data which are not presented on cluster, " +
+                            "it could mean that they were already destroyed, to add the node to cluster - " +
+                            "remove directories with the caches[", "]"));
+            }
+        }
+
+        return  null;
+    }
+
+    /**
      * @param clientData Discovery data.
      * @param clientNodeId Client node ID.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 3032dc8..231d843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -134,21 +134,22 @@ public class DynamicCacheChangeRequest implements Serializable {
     /**
      * @param ctx Context.
      * @param cfg0 Template configuration.
+     * @param splitCfg Cache configuration splitter.
      * @return Request to add template.
      */
-    static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
-        CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
-
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
+    static DynamicCacheChangeRequest addTemplateRequest(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> cfg0,
+        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg
+    ) {
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg0.getName(), ctx.localNodeId());
 
         req.template(true);
 
-        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = ctx.cache().backwardCompatibleSplitter().split(cfg);
-
         req.startCacheConfiguration(splitCfg.get1());
         req.cacheConfigurationEnrichment(splitCfg.get2());
 
-        req.schema(new QuerySchema(cfg.getQueryEntities()));
+        req.schema(new QuerySchema(cfg0.getQueryEntities()));
         req.deploymentId(IgniteUuid.randomUuid());
 
         return req;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7ba8ae3..8eb6164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -20,12 +20,10 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -36,12 +34,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import javax.cache.configuration.FactoryBuilder;
-import javax.cache.expiry.EternalExpiryPolicy;
-import javax.cache.expiry.ExpiryPolicy;
 import javax.management.MBeanServer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
@@ -49,24 +45,18 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
-import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.events.EventType;
@@ -76,7 +66,6 @@ import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -90,7 +79,6 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
@@ -146,8 +134,6 @@ import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTa
 import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
-import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
@@ -177,7 +163,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
 import org.apache.ignite.mxbean.IgniteMBeanAware;
 import org.apache.ignite.plugin.security.SecurityException;
@@ -186,9 +171,6 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
-import org.apache.ignite.spi.indexing.IndexingSpi;
-import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -200,23 +182,16 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
-import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
-import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteFeatures.TRANSACTION_OWNER_THREAD_DUMP_PROVIDING;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
-import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext;
+import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers;
 import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
 
 /**
@@ -224,15 +199,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
  */
 @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
 public class GridCacheProcessor extends GridProcessorAdapter {
-    /** Template of message of conflicts during configuration merge*/
-    private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
-        "Conflicts during configuration merge for cache '%s' : \n%s";
-
-    /** Template of message of node join was fail because it requires to merge of config */
-    private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
-        "(the config of the cache '%s' has to be merged which is impossible on active grid). " +
-        "Deactivate grid and retry node join or clean the joining node.";
-
     /** Invalid region configuration message. */
     private static final String INVALID_REGION_CONFIGURATION_MESSAGE = "Failed to join node " +
         "(Incompatible data region configuration [region=%s, locNodeId=%s, isPersistenceEnabled=%s, rmtNodeId=%s, isPersistenceEnabled=%s])";
@@ -247,10 +213,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** */
     private static final String CACHE_NAMES_AND_OPERATION_FORMAT = "[cacheNames=%s, operation=%s]";
 
-    /** */
-    private final boolean startClientCaches =
-        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
-
     /** Enables start caches in parallel. */
     private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
@@ -265,6 +227,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** */
     private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>();
 
+    /** Flag that caches were already filtered out. */
+    private final AtomicBoolean alreadyFiltered = new AtomicBoolean();
+
     /** */
     private final Map<String, GridCacheAdapter<?, ?>> caches;
 
@@ -274,9 +239,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Map of proxies. */
     private final ConcurrentHashMap<String, IgniteCacheProxyImpl<?, ?>> jCacheProxies;
 
-    /** Caches stop sequence. */
-    private final Deque<String> stopSeq;
-
     /** Transaction interface implementation. */
     private IgniteTransactionsImpl transactions;
 
@@ -297,6 +259,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private ClusterCachesInfo cachesInfo;
 
     /** */
+    private GridLocalConfigManager locCfgMgr;
+
+    /** */
     private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
 
     /** Must use JDK marsh since it is used by discovery to fire custom events. */
@@ -320,9 +285,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Tmp storage for meta migration. */
     private MetaStorage.TmpStorage tmpStorage;
 
-    /** Node's local cache configurations (both from static configuration and from persistent caches). */
-    private CacheJoinNodeDiscoveryData localConfigs;
-
     /** Cache configuration splitter. */
     private CacheConfigurationSplitter splitter;
 
@@ -337,7 +299,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         caches = new ConcurrentHashMap<>();
         jCacheProxies = new ConcurrentHashMap<>();
-        stopSeq = new LinkedList<>();
         internalCaches = new HashSet<>();
 
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
@@ -346,19 +307,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cfg Initializes cache configuration with proper defaults.
-     * @param cacheObjCtx Cache object context.
-     * @throws IgniteCheckedException If configuration is not valid.
-     */
-    private void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx)
-        throws IgniteCheckedException {
-        CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
-
-        ctx.coordinators().preProcessCacheConfiguration(cfg);
-        ctx.igfsHelper().preProcessCacheConfiguration(cfg);
-    }
-
-    /**
      * @param cfg Configuration to check for possible performance issues.
      * @param hasStore {@code True} if store is configured.
      */
@@ -487,241 +435,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param c Ignite Configuration.
-     * @param cc Cache Configuration.
-     * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
-     */
-    private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
-        if (c.isClientMode() && c.getDataStorageConfiguration() == null) {
-            if (cc.getCacheMode() == LOCAL)
-                return true;
-
-            return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
-
-        }
-        else
-            return false;
-    }
-
-    /**
-     * @param c Ignite configuration.
-     * @param cc Configuration to validate.
-     * @param cacheType Cache type.
-     * @param cfgStore Cache store.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void validate(IgniteConfiguration c,
-        CacheConfiguration cc,
-        CacheType cacheType,
-        @Nullable CacheStore cfgStore) throws IgniteCheckedException {
-        assertParameter(cc.getName() != null && !cc.getName().isEmpty(), "name is null or empty");
-
-        if (cc.getCacheMode() == REPLICATED) {
-            if (cc.getNearConfiguration() != null &&
-                ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
-                U.warn(log, "Near cache cannot be used with REPLICATED cache, " +
-                    "will be ignored [cacheName=" + U.maskName(cc.getName()) + ']');
-
-                cc.setNearConfiguration(null);
-            }
-        }
-
-        if (storesLocallyOnClient(c, cc))
-            throw new IgniteCheckedException("DataRegion for client caches must be explicitly configured " +
-                "on client node startup. Use DataStorageConfiguration to configure DataRegion.");
-
-        if (cc.getCacheMode() == LOCAL && !cc.getAffinity().getClass().equals(LocalAffinityFunction.class))
-            U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
-                U.maskName(cc.getName()) + ']');
-
-        if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
-            throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
-                " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
-
-        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
-            assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
-            assertParameter(cc.getRebalanceTimeout() >= 0, "rebalanceTimeout >= 0");
-            assertParameter(cc.getRebalanceThrottle() >= 0, "rebalanceThrottle >= 0");
-            assertParameter(cc.getRebalanceBatchesPrefetchCount() > 0, "rebalanceBatchesPrefetchCount > 0");
-        }
-
-        if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
-            if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
-                U.warn(log, "Cache write synchronization mode is set to FULL_ASYNC. All single-key 'put' and " +
-                    "'remove' operations will return 'null', all 'putx' and 'removex' operations will return" +
-                    " 'true' [cacheName=" + U.maskName(cc.getName()) + ']');
-        }
-
-        DeploymentMode depMode = c.getDeploymentMode();
-
-        if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
-            !CU.isSystemCache(cc.getName()) && !(c.getMarshaller() instanceof BinaryMarshaller))
-            throw new IgniteCheckedException("Cache can be started in PRIVATE or ISOLATED deployment mode only when" +
-                " BinaryMarshaller is used [depMode=" + ctx.config().getDeploymentMode() + ", marshaller=" +
-                c.getMarshaller().getClass().getName() + ']');
-
-        if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
-            throw new IgniteCheckedException("Affinity function must return at most " +
-                CacheConfiguration.MAX_PARTITIONS_COUNT + " partitions [actual=" + cc.getAffinity().partitions() +
-                ", affFunction=" + cc.getAffinity() + ", cacheName=" + cc.getName() + ']');
-
-        if (cc.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
-            assertParameter(cc.getCacheMode() != LOCAL,
-                "LOCAL cache mode cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            assertParameter(cc.getNearConfiguration() == null,
-                "near cache cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            assertParameter(!cc.isReadThrough(),
-                "readThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            assertParameter(!cc.isWriteThrough(),
-                "writeThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            assertParameter(!cc.isWriteBehindEnabled(),
-                "writeBehindEnabled cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            assertParameter(cc.getRebalanceMode() != NONE,
-                "Rebalance mode NONE cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            ExpiryPolicy expPlc = null;
-
-            if (cc.getExpiryPolicyFactory() instanceof FactoryBuilder.SingletonFactory)
-                expPlc = (ExpiryPolicy)cc.getExpiryPolicyFactory().create();
-
-            if (!(expPlc instanceof EternalExpiryPolicy)) {
-                assertParameter(cc.getExpiryPolicyFactory() == null,
-                    "expiry policy cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-            }
-
-            assertParameter(cc.getInterceptor() == null,
-                "interceptor cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
-            // Disable in-memory evictions for mvcc cache. TODO IGNITE-10738
-            String memPlcName = cc.getDataRegionName();
-            DataRegion dataRegion = sharedCtx.database().dataRegion(memPlcName);
-
-            if (dataRegion != null && !dataRegion.config().isPersistenceEnabled() &&
-                dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
-                throw new IgniteCheckedException("Data pages evictions cannot be used with TRANSACTIONAL_SNAPSHOT " +
-                    "cache atomicity mode for in-memory regions. Please, either disable evictions or enable " +
-                    "persistence for data regions with TRANSACTIONAL_SNAPSHOT caches. [cacheName=" + cc.getName() +
-                    ", dataRegionName=" + memPlcName + ", pageEvictionMode=" +
-                    dataRegion.config().getPageEvictionMode() + ']');
-            }
-
-            IndexingSpi idxSpi = ctx.config().getIndexingSpi();
-
-            assertParameter(idxSpi == null || idxSpi instanceof NoopIndexingSpi,
-                "Custom IndexingSpi cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-        }
-
-        if (cc.isWriteBehindEnabled() && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
-            if (cfgStore == null)
-                throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
-                    "for cache: " + U.maskName(cc.getName()));
-
-            assertParameter(cc.getWriteBehindBatchSize() > 0, "writeBehindBatchSize > 0");
-            assertParameter(cc.getWriteBehindFlushSize() >= 0, "writeBehindFlushSize >= 0");
-            assertParameter(cc.getWriteBehindFlushFrequency() >= 0, "writeBehindFlushFrequency >= 0");
-            assertParameter(cc.getWriteBehindFlushThreadCount() > 0, "writeBehindFlushThreadCount > 0");
-
-            if (cc.getWriteBehindFlushSize() == 0 && cc.getWriteBehindFlushFrequency() == 0)
-                throw new IgniteCheckedException("Cannot set both 'writeBehindFlushFrequency' and " +
-                    "'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName()));
-        }
-
-        if (cc.isReadThrough() && cfgStore == null
-            && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
-            throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " +
-                "for cache: " + U.maskName(cc.getName()));
-
-        if (cc.isWriteThrough() && cfgStore == null
-            && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
-            throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " +
-                "for cache: " + U.maskName(cc.getName()));
-
-        long delay = cc.getRebalanceDelay();
-
-        if (delay != 0) {
-            if (cc.getCacheMode() != PARTITIONED)
-                U.warn(log, "Rebalance delay is supported only for partitioned caches (will ignore): " + (cc.getName()));
-            else if (cc.getRebalanceMode() == SYNC) {
-                if (delay < 0) {
-                    U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start (node will not wait for " +
-                        "rebalancing to be finished): " + U.maskName(cc.getName()));
-                }
-                else {
-                    U.warn(log, "Using SYNC rebalance mode with rebalance delay (node will wait until rebalancing is " +
-                        "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()));
-                }
-            }
-        }
-
-        ctx.igfsHelper().validateCacheConfiguration(cc);
-        ctx.coordinators().validateCacheConfiguration(cc);
-
-        if (cc.getAtomicityMode() == ATOMIC)
-            assertParameter(cc.getTransactionManagerLookupClassName() == null,
-                "transaction manager can not be used with ATOMIC cache");
-
-        if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null) && !cc.isOnheapCacheEnabled())
-            throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName="
-                + U.maskName(cc.getName()) + "]");
-
-        if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isDataStructureCache(cc.getName()))
-            throw new IgniteCheckedException("Using cache names reserved for datastructures is not allowed for " +
-                "other cache types [cacheName=" + cc.getName() + ", cacheType=" + cacheType + "]");
-
-        if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isReservedGroup(cc.getGroupName()))
-            throw new IgniteCheckedException("Using cache group names reserved for datastructures is not allowed for " +
-                "other cache types [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
-                ", cacheType=" + cacheType + "]");
-
-        // Make sure we do not use sql schema for system views.
-        if (ctx.query().moduleEnabled()) {
-            String schema = QueryUtils.normalizeSchemaName(cc.getName(), cc.getSqlSchema());
-
-            if (F.eq(schema, QueryUtils.SCHEMA_SYS)) {
-                if (cc.getSqlSchema() == null) {
-                    // Conflict on cache name.
-                    throw new IgniteCheckedException("SQL schema name derived from cache name is reserved (" +
-                        "please set explicit SQL schema name through CacheConfiguration.setSqlSchema() or choose " +
-                        "another cache name) [cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + "]");
-                }
-                else {
-                    // Conflict on schema name.
-                    throw new IgniteCheckedException("SQL schema name is reserved (please choose another one) [" +
-                        "cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + ']');
-                }
-            }
-        }
-
-        if (cc.isEncryptionEnabled() && !ctx.clientNode()) {
-            StringBuilder cacheSpec = new StringBuilder("[cacheName=").append(cc.getName())
-                .append(", groupName=").append(cc.getGroupName())
-                .append(", cacheType=").append(cacheType)
-                .append(']');
-
-            if (!CU.isPersistentCache(cc, c.getDataStorageConfiguration())) {
-                throw new IgniteCheckedException("Using encryption is not allowed" +
-                    " for not persistent cache " + cacheSpec.toString());
-            }
-
-            EncryptionSpi encSpi = c.getEncryptionSpi();
-
-            if (encSpi == null) {
-                throw new IgniteCheckedException("EncryptionSpi should be configured to use encrypted cache " +
-                    cacheSpec.toString());
-            }
-
-            if (cc.getDiskPageCompression() != DiskPageCompression.DISABLED)
-                throw new IgniteCheckedException("Encryption cannot be used with disk page compression " +
-                    cacheSpec.toString());
-        }
-    }
-
-    /**
      * @param ctx Context.
      * @return DHT managers.
      */
@@ -837,31 +550,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
     }
 
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    private void restoreCacheConfigurations() throws IgniteCheckedException {
-        if (ctx.isDaemon())
-            return;
-
-        Map<String, CacheInfo> caches = new HashMap<>();
-
-        Map<String, CacheInfo> templates = new HashMap<>();
-
-        addCacheOnJoinFromConfig(caches, templates);
-
-        CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
-            IgniteUuid.randomUuid(),
-            caches,
-            templates,
-            startAllCachesOnClientStart()
-        );
-
-        localConfigs = discoData;
-
-        cachesInfo.onStart(discoData);
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void start() throws IgniteCheckedException {
@@ -885,14 +573,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         sharedCtx = createSharedContext(ctx, sessionListeners);
 
+        locCfgMgr = new GridLocalConfigManager(this, ctx);
+
         transactions = new IgniteTransactionsImpl(sharedCtx, null);
 
         // Start shared managers.
         for (GridCacheSharedManager mgr : sharedCtx.managers())
             mgr.start(sharedCtx);
 
-        if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode())
-            restoreCacheConfigurations();
+        if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode()) {
+            CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
+
+            if (data != null)
+                cachesInfo.onStart(data);
+        }
 
         if (log.isDebugEnabled())
             log.debug("Started cache processor.");
@@ -901,178 +595,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         ctx.authentication().cacheProcessorStarted();
     }
 
-    /**
-     * @param cfg Cache configuration.
-     * @param sql SQL flag.
-     * @param caches Caches map.
-     * @param templates Templates map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void addCacheOnJoin(CacheConfiguration<?, ?> cfg, boolean sql,
-        Map<String, CacheInfo> caches,
-        Map<String, CacheInfo> templates) throws IgniteCheckedException {
-        String cacheName = cfg.getName();
-
-        CU.validateCacheName(cacheName);
-
-        cloneCheckSerializable(cfg);
-
-        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
-
-        // Initialize defaults.
-        initialize(cfg, cacheObjCtx);
-
-        StoredCacheData cacheData = new StoredCacheData(cfg);
-
-        cacheData.sql(sql);
-
-        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(cfg);
-
-        cacheData.config(splitCfg.get1());
-        cacheData.cacheConfigurationEnrichment(splitCfg.get2());
-
-        cfg = splitCfg.get1();
-
-        if (GridCacheUtils.isCacheTemplateName(cacheName))
-            templates.put(cacheName, new CacheInfo(cacheData, CacheType.USER, false, 0, true));
-        else {
-            if (caches.containsKey(cacheName)) {
-                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
-                    "assign unique name to each cache): " + cacheName);
-            }
-
-            CacheType cacheType = cacheType(cacheName);
-
-            if (cacheType != CacheType.USER && cfg.getDataRegionName() == null)
-                cfg.setDataRegionName(sharedCtx.database().systemDateRegionName());
-
-            addStoredCache(caches, cacheData, cacheName, cacheType, true);
-        }
-    }
-
-    /**
-     * Add stored cache data to caches storage.
-     *
-     * @param caches Cache storage.
-     * @param cacheData Cache data to add.
-     * @param cacheName Cache name.
-     * @param cacheType Cache type.
-     * @param isStaticalyConfigured Statically configured flag.
-     */
-    private void addStoredCache(Map<String, CacheInfo> caches, StoredCacheData cacheData, String cacheName,
-        CacheType cacheType, boolean isStaticalyConfigured) {
-        if (!caches.containsKey(cacheName)) {
-            if (!cacheType.userCache())
-                stopSeq.addLast(cacheName);
-            else
-                stopSeq.addFirst(cacheName);
-        }
-
-        caches.put(cacheName, new CacheInfo(cacheData, cacheType, cacheData.sql(), 0, isStaticalyConfigured));
-    }
 
     /**
-     * @param caches Caches map.
-     * @param templates Templates map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void addCacheOnJoinFromConfig(
-        Map<String, CacheInfo> caches,
-        Map<String, CacheInfo> templates
-    ) throws IgniteCheckedException {
-        assert !ctx.config().isDaemon();
-
-        CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
-
-        for (int i = 0; i < cfgs.length; i++) {
-            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
-
-            // Replace original configuration value.
-            cfgs[i] = cfg;
-
-            addCacheOnJoin(cfg, false, caches, templates);
-        }
-
-        if (CU.isPersistenceEnabled(ctx.config()) && ctx.cache().context().pageStore() != null) {
-            Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations();
-
-            if (!F.isEmpty(storedCaches)) {
-                List<String> skippedConfigs = new ArrayList<>();
-
-                for (StoredCacheData storedCacheData : storedCaches.values()) {
-                    // Backward compatibility for old stored caches data.
-                    if (storedCacheData.hasOldCacheConfigurationFormat()) {
-                        storedCacheData = new StoredCacheData(storedCacheData);
-
-                        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(storedCacheData.config());
-
-                        storedCacheData.config(splitCfg.get1());
-                        storedCacheData.cacheConfigurationEnrichment(splitCfg.get2());
-
-                        // Overwrite with new format.
-                        saveCacheConfiguration(storedCacheData);
-                    }
-
-                    String cacheName = storedCacheData.config().getName();
-
-                    CacheType type = cacheType(cacheName);
-
-                    if (!caches.containsKey(cacheName))
-                        // No static cache - add the configuration.
-                        addStoredCache(caches, storedCacheData, cacheName, type, false);
-                    else {
-                        // A static cache with the same name already exists.
-                        CacheConfiguration cfg = caches.get(cacheName).cacheData().config();
-                        CacheConfiguration cfgFromStore = storedCacheData.config();
-
-                        validateCacheConfigurationOnRestore(cfg, cfgFromStore);
-
-                        if (!keepStaticCacheConfiguration) {
-                            addStoredCache(caches, storedCacheData, cacheName, type, false);
-
-                            if (type == CacheType.USER)
-                                skippedConfigs.add(cacheName);
-                        }
-                    }
-                }
-
-                if (!F.isEmpty(skippedConfigs))
-                    U.warn(log, "Static configuration for the following caches will be ignored because a persistent " +
-                        "cache with the same name already exist (see " +
-                        "https://apacheignite.readme.io/docs/cache-configuration for more information): " +
-                        skippedConfigs);
-            }
-        }
-    }
-
-    /**
-     * Validates cache configuration against stored cache configuration when persistence is enabled.
-     *
-     * @param cfg Configured cache configuration.
-     * @param cfgFromStore Stored cache configuration
-     * @throws IgniteCheckedException If validation failed.
+     * @param cfg Initializes cache configuration with proper defaults.
+     * @param cacheObjCtx Cache object context.
+     * @throws IgniteCheckedException If configuration is not valid.
      */
-    private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, CacheConfiguration cfgFromStore)
-        throws IgniteCheckedException {
-        assert cfg != null && cfgFromStore != null;
-
-        if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT ||
-            cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
-            && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) {
-            throw new IgniteCheckedException("Cannot start cache. Statically configured atomicity mode differs from " +
-                "previously stored configuration. Please check your configuration: [cacheName=" + cfg.getName() +
-                ", configuredAtomicityMode=" + cfg.getAtomicityMode() +
-                ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + "]");
-        }
-
-        boolean staticCfgVal = cfg.isEncryptionEnabled();
-
-        boolean storedVal = cfgFromStore.isEncryptionEnabled();
+    void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
+        CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
 
-        if (storedVal != staticCfgVal) {
-            throw new IgniteCheckedException("Encrypted flag value differs. Static config value is '" + staticCfgVal +
-                "' and value stored on the disk is '" + storedVal + "'");
-        }
+        ctx.coordinators().preProcessCacheConfiguration(cfg);
     }
 
     /**
@@ -1093,32 +625,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param rmtNode Remote node to check.
-     * @return Data storage configuration
-     */
-    private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
-        return GridCacheUtils.extractDataStorage(
-            rmtNode,
-            ctx.marshallerContext().jdkMarshaller(),
-            U.resolveClassLoader(ctx.config())
-        );
-    }
-
-    /**
-     * @param dataStorageCfg User-defined data regions.
-     */
-    private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
-        if(dataStorageCfg != null) {
-            return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
-                .map(Stream::of)
-                .orElseGet(Stream::empty)
-                .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
-        }
-
-        return Collections.emptyMap();
-    }
-
-    /**
      * @param grpId Group ID.
      * @return Cache group.
      */
@@ -1142,7 +648,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
 
             if (checkConsistency)
-                checkConsistency();
+                ValidationOnNodeJoinUtils.checkConsistency(ctx, log);
 
             cachesInfo.onKernalStart(checkConsistency);
 
@@ -1197,33 +703,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
     }
 
-    /**
-     * @throws IgniteCheckedException if check failed.
-     */
-    private void checkConsistency() throws IgniteCheckedException {
-        Collection<ClusterNode> rmtNodes = ctx.discovery().remoteNodes();
-
-        boolean changeablePoolSize = IgniteFeatures.allNodesSupports(rmtNodes, IgniteFeatures.DIFFERENT_REBALANCE_POOL_SIZE);
-
-        for (ClusterNode n : rmtNodes) {
-            if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
-                continue;
-
-            if(!changeablePoolSize)
-                checkRebalanceConfiguration(n);
-
-            checkTransactionConfiguration(n);
-
-            checkMemoryConfiguration(n);
-
-            DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-            DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
-
-            CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
-                locDepMode, rmtDepMode, true);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
         stopCaches(cancel);
@@ -1248,7 +727,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cancel Cancel.
      */
     public void stopCaches(boolean cancel) {
-        for (String cacheName : stopSeq) {
+        for (String cacheName : locCfgMgr.stopSequence()) {
             GridCacheAdapter<?, ?> cache = stoppedCaches.remove(cacheName);
 
             if (cache != null)
@@ -1315,7 +794,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             aff.cancelFutures(affErr);
         }
 
-        for (String cacheName : stopSeq) {
+        for (String cacheName : locCfgMgr.stopSequence()) {
             GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
 
             if (cache != null) {
@@ -1680,7 +1159,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
 
-        validate(ctx.config(), cfg, desc.cacheType(), cfgStore);
+        ValidationOnNodeJoinUtils.validate(ctx.config(), cfg, desc.cacheType(), cfgStore, ctx, log, (x, y) -> {
+            try {
+                assertParameter(x, y);
+            }
+            catch (IgniteCheckedException ex) {
+                return ex;
+            }
+
+            return null;
+        });
 
         if (pluginMgr == null)
             pluginMgr = new CachePluginManager(ctx, cfg);
@@ -2131,10 +1619,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Caches to be started when this node starts.
      */
     @Nullable public LocalJoinCachesContext localJoinCachesContext() {
-        if (ctx.discovery().localNode().order() == 1 && localConfigs != null)
-            cachesInfo.filterDynamicCacheDescriptors(localConfigs);
-
-        localConfigs = null;
+        if (ctx.discovery().localNode().order() == 1 && alreadyFiltered.compareAndSet(false, true)) {
+            cachesInfo.filterDynamicCacheDescriptors(locCfgMgr.localCachesOnStart());
+        }
 
         return cachesInfo.localJoinCachesContext();
     }
@@ -3237,6 +2724,32 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param rmtNode Remote node to check.
+     * @return Data storage configuration
+     */
+    private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
+        return GridCacheUtils.extractDataStorage(
+            rmtNode,
+            ctx.marshallerContext().jdkMarshaller(),
+            U.resolveClassLoader(ctx.config())
+        );
+    }
+
+    /**
+     * @param dataStorageCfg User-defined data regions.
+     */
+    private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
+        if (dataStorageCfg != null) {
+            return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
+                .map(Stream::of)
+                .orElseGet(Stream::empty)
+                .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
+        }
+
+        return Collections.emptyMap();
+    }
+
+    /**
      * Force checkpoint and remove offheap checkpoint listener after it was finished.
      *
      * @param grpToStop Cache group to stop.
@@ -3503,7 +3016,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        cachesInfo.collectGridNodeData(dataBag);
+        cachesInfo.collectGridNodeData(dataBag, backwardCompatibleSplitter());
     }
 
     /** {@inheritDoc} */
@@ -3525,132 +3038,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if(!cachesInfo.isMergeConfigSupports(node))
             return null;
 
-        if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
-            CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
-
-            boolean isGridActive = ctx.state().clusterState().active();
-
-            StringBuilder errorMsg = new StringBuilder();
-
-            SecurityContext secCtx = null;
+        String validationRes = cachesInfo.validateJoiningNodeData(discoData);
 
-            if (ctx.security().enabled()) {
-                try {
-                    secCtx = nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node);
-                }
-                catch (SecurityException se) {
-                    errorMsg.append(se.getMessage());
-                }
-            }
-
-            if (!node.isClient()) {
-                validateRmtRegions(node).forEach(error -> {
-                    if (errorMsg.length() > 0)
-                        errorMsg.append("\n");
+        if (validationRes != null)
+            return new IgniteNodeValidationResult(node.id(), validationRes, validationRes);
 
-                    errorMsg.append(error);
-                });
-            }
-
-            for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) {
-                if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) {
-                    try (OperationSecurityContext s = ctx.security().withContext(secCtx)) {
-                        authorizeCacheCreate(cacheInfo.cacheData().config());
-                    }
-                    catch (SecurityException ex) {
-                        if (errorMsg.length() > 0)
-                            errorMsg.append("\n");
-
-                        errorMsg.append(ex.getMessage());
-                    }
-                }
-
-                DynamicCacheDescriptor locDesc = cacheDescriptor(cacheInfo.cacheData().config().getName());
-
-                if (locDesc == null)
-                    continue;
-
-                QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
-
-                if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
-                    if (errorMsg.length() > 0)
-                        errorMsg.append("\n");
-
-                    if (schemaPatch.hasConflicts())
-                        errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
-                            locDesc.cacheName(), schemaPatch.getConflictsMessage()));
-                    else
-                        errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
-                }
-
-                // This check must be done on join, otherwise group encryption key will be
-                // written to metastore regardless of validation check and could trigger WAL write failures.
-                boolean locEnc = locDesc.cacheConfiguration().isEncryptionEnabled();
-                boolean rmtEnc = cacheInfo.cacheData().config().isEncryptionEnabled();
-
-                if (locEnc != rmtEnc) {
-                    if (errorMsg.length() > 0)
-                        errorMsg.append("\n");
-
-                    // Message will be printed on remote node, so need to swap local and remote.
-                    errorMsg.append(String.format(ENCRYPT_MISMATCH_MESSAGE, locDesc.cacheName(), rmtEnc, locEnc));
-                }
-            }
-
-            if (errorMsg.length() > 0) {
-                String msg = errorMsg.toString();
-
-                return new IgniteNodeValidationResult(node.id(), msg);
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * @param rmtNode Joining node.
-     * @return List of validation errors.
-     */
-    private List<String> validateRmtRegions(ClusterNode rmtNode) {
-        List<String> errorMessages = new ArrayList<>();
-
-        DataStorageConfiguration rmtStorageCfg = extractDataStorage(rmtNode);
-        Map<String, DataRegionConfiguration> rmtRegionCfgs = dataRegionCfgs(rmtStorageCfg);
-
-        DataStorageConfiguration locStorageCfg = ctx.config().getDataStorageConfiguration();
-
-        if (isDefaultDataRegionPersistent(locStorageCfg) != isDefaultDataRegionPersistent(rmtStorageCfg)) {
-            errorMessages.add(String.format(
-                INVALID_REGION_CONFIGURATION_MESSAGE,
-                "DEFAULT",
-                ctx.localNodeId(),
-                isDefaultDataRegionPersistent(locStorageCfg),
-                rmtNode.id(),
-                isDefaultDataRegionPersistent(rmtStorageCfg)
-            ));
-        }
-
-        for (ClusterNode clusterNode : ctx.discovery().aliveServerNodes()) {
-            Map<String, DataRegionConfiguration> nodeRegionCfg = dataRegionCfgs(extractDataStorage(clusterNode));
-
-            for (Map.Entry<String, DataRegionConfiguration> nodeRegionCfgEntry : nodeRegionCfg.entrySet()) {
-                String regionName = nodeRegionCfgEntry.getKey();
-
-                DataRegionConfiguration rmtRegionCfg = rmtRegionCfgs.get(regionName);
-
-                if (rmtRegionCfg != null && rmtRegionCfg.isPersistenceEnabled() != nodeRegionCfgEntry.getValue().isPersistenceEnabled())
-                    errorMessages.add(String.format(
-                        INVALID_REGION_CONFIGURATION_MESSAGE,
-                        regionName,
-                        ctx.localNodeId(),
-                        nodeRegionCfgEntry.getValue().isPersistenceEnabled(),
-                        rmtNode.id(),
-                        rmtRegionCfg.isPersistenceEnabled()
-                    ));
-            }
-        }
-
-        return errorMessages;
+        return ValidationOnNodeJoinUtils.validateNode(node, discoData, marsh, ctx, this::cacheDescriptor);
     }
 
     /**
@@ -3836,13 +3229,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @return {@code True} if need locally start all existing caches on client node start.
-     */
-    private boolean startAllCachesOnClientStart() {
-        return startClientCaches && ctx.clientNode();
-    }
-
-    /**
      * Dynamically starts cache using template configuration.
      *
      * @param cacheName Cache name.
@@ -4297,7 +3683,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /** Resolve cache type for input cacheType */
-    @NotNull private CacheType resolveCacheType(CacheConfiguration ccfg) {
+    private @NotNull CacheType resolveCacheType(CacheConfiguration ccfg) {
         if (CU.isUtilityCache(ccfg.getName()))
             return CacheType.UTILITY;
         else if (internalCaches.contains(ccfg.getName()))
@@ -4384,7 +3770,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store.
      * @return Future that will be completed when cache is destroyed.
      */
-    @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
+    public @NotNull DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
         DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true);
 
         req.stop(true);
@@ -4401,7 +3787,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param reqs cache stop requests.
      * @return compound future.
      */
-    @NotNull public IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
+    public @NotNull IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
         GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
 
         for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs))
@@ -4522,22 +3908,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void saveCacheConfiguration(DynamicCacheDescriptor desc) throws IgniteCheckedException {
         assert desc != null;
 
-        if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
-            isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration()))
-            sharedCtx.pageStore().storeCacheData(desc.toStoredData(splitter), true);
+        locCfgMgr.saveCacheConfiguration(desc.toStoredData(splitter), true);
     }
 
     /**
      * Save cache configuration to persistent store if necessary.
      *
      * @param storedCacheData Stored cache data.
+     * @param overwrite Overwrite existing.
      */
-    public void saveCacheConfiguration(StoredCacheData storedCacheData) throws IgniteCheckedException {
+    public void saveCacheConfiguration(StoredCacheData storedCacheData, boolean overwrite) throws IgniteCheckedException {
         assert storedCacheData != null;
 
-        if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
-            isPersistentCache(storedCacheData.config(), sharedCtx.gridConfig().getDataStorageConfiguration()))
-            sharedCtx.pageStore().storeCacheData(storedCacheData, true);
+        locCfgMgr.saveCacheConfiguration(storedCacheData, overwrite);
     }
 
     /**
@@ -4635,8 +4018,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Authorize creating cache.
      *
      * @param cfg Cache configuration.
+     * @param ctx Kernal context.
      */
-    private void authorizeCacheCreate(CacheConfiguration cfg) {
+    static void authorizeCacheCreate(CacheConfiguration cfg, GridKernalContext ctx) {
         if(cfg != null) {
             ctx.security().authorize(cfg.getName(), SecurityPermission.CACHE_CREATE);
 
@@ -4656,14 +4040,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (req.stop())
                 ctx.security().authorize(req.cacheName(), SecurityPermission.CACHE_DESTROY);
             else
-                authorizeCacheCreate(req.startCacheConfiguration());
+                authorizeCacheCreate(req.startCacheConfiguration(), ctx);
         }
     }
 
     /**
      * @return Non null exception if node is stopping or disconnected.
      */
-    @Nullable private IgniteCheckedException checkNodeState() {
+    private @Nullable IgniteCheckedException checkNodeState() {
         if (ctx.isStopping()) {
             return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
                 "node is stopping.");
@@ -4747,42 +4131,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         return false;
     }
 
-    /**
-     * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
-     *
-     * @param cfgs Caches.
-     * @return Maximum detected preload order.
-     * @throws IgniteCheckedException If validation failed.
-     */
-    private int validatePreloadOrder(CacheConfiguration[] cfgs) throws IgniteCheckedException {
-        int maxOrder = 0;
-
-        for (CacheConfiguration cfg : cfgs) {
-            int rebalanceOrder = cfg.getRebalanceOrder();
-
-            if (rebalanceOrder > 0) {
-                if (cfg.getCacheMode() == LOCAL)
-                    throw new IgniteCheckedException("Rebalance order set for local cache (fix configuration and restart the " +
-                        "node): " + U.maskName(cfg.getName()));
-
-                if (cfg.getRebalanceMode() == CacheRebalanceMode.NONE)
-                    throw new IgniteCheckedException("Only caches with SYNC or ASYNC rebalance mode can be set as rebalance " +
-                        "dependency for other caches [cacheName=" + U.maskName(cfg.getName()) +
-                        ", rebalanceMode=" + cfg.getRebalanceMode() + ", rebalanceOrder=" + cfg.getRebalanceOrder() + ']');
-
-                maxOrder = Math.max(maxOrder, rebalanceOrder);
-            }
-            else if (rebalanceOrder < 0)
-                throw new IgniteCheckedException("Rebalance order cannot be negative for cache (fix configuration and restart " +
-                    "the node) [cacheName=" + cfg.getName() + ", rebalanceOrder=" + rebalanceOrder + ']');
-        }
-
-        return maxOrder;
-    }
-
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
-        IgniteNodeValidationResult res = validateHashIdResolvers(node);
+    @Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
+        IgniteNodeValidationResult res = validateHashIdResolvers(node, ctx, cacheDescriptors());
 
         if (res == null)
             res = validateRestartingCaches(node);
@@ -4814,145 +4165,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param node Joining node.
-     * @return Validation result or {@code null} in case of success.
-     */
-    @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) {
-        if (!node.isClient()) {
-            for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
-                CacheConfiguration cfg = desc.cacheConfiguration();
-
-                if (cfg.getAffinity() instanceof RendezvousAffinityFunction) {
-                    RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity();
-
-                    Object nodeHashObj = aff.resolveNodeHash(node);
-
-                    for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) {
-                        Object topNodeHashObj = aff.resolveNodeHash(topNode);
-
-                        if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) {
-                            String errMsg = "Failed to add node to topology because it has the same hash code for " +
-                                "partitioned affinity as one of existing nodes [cacheName=" +
-                                cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
-
-                            String sndMsg = "Failed to add node to topology because it has the same hash code for " +
-                                "partitioned affinity as one of existing nodes [cacheName=" +
-                                cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
-
-                            return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg);
-                        }
-                    }
-                }
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * @param rmt Remote node to check.
-     * @throws IgniteCheckedException If check failed.
-     */
-    private void checkTransactionConfiguration(ClusterNode rmt) throws IgniteCheckedException {
-        TransactionConfiguration rmtTxCfg = rmt.attribute(ATTR_TX_CONFIG);
-
-        if (rmtTxCfg != null) {
-            TransactionConfiguration locTxCfg = ctx.config().getTransactionConfiguration();
-
-            checkDeadlockDetectionConfig(rmt, rmtTxCfg, locTxCfg);
-
-            checkSerializableEnabledConfig(rmt, rmtTxCfg, locTxCfg);
-        }
-    }
-
-    /** */
-    private void checkDeadlockDetectionConfig(ClusterNode rmt, TransactionConfiguration rmtTxCfg,
-        TransactionConfiguration locTxCfg) {
-        boolean locDeadlockDetectionEnabled = locTxCfg.getDeadlockTimeout() > 0;
-        boolean rmtDeadlockDetectionEnabled = rmtTxCfg.getDeadlockTimeout() > 0;
-
-        if (locDeadlockDetectionEnabled != rmtDeadlockDetectionEnabled) {
-            U.warn(log, "Deadlock detection is enabled on one node and disabled on another. " +
-                "Disabled detection on one node can lead to undetected deadlocks. [rmtNodeId=" + rmt.id() +
-                ", locDeadlockTimeout=" + locTxCfg.getDeadlockTimeout() +
-                ", rmtDeadlockTimeout=" + rmtTxCfg.getDeadlockTimeout());
-        }
-    }
-
-    /** */
-    private void checkSerializableEnabledConfig(ClusterNode rmt, TransactionConfiguration rmtTxCfg,
-        TransactionConfiguration locTxCfg) throws IgniteCheckedException {
-        if (locTxCfg.isTxSerializableEnabled() != rmtTxCfg.isTxSerializableEnabled())
-            throw new IgniteCheckedException("Serializable transactions enabled mismatch " +
-                "(fix txSerializableEnabled property or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " +
-                "system property) [rmtNodeId=" + rmt.id() +
-                ", locTxSerializableEnabled=" + locTxCfg.isTxSerializableEnabled() +
-                ", rmtTxSerializableEnabled=" + rmtTxCfg.isTxSerializableEnabled() + ']');
-    }
-
-    /**
-     * @param rmt Remote node to check.
-     * @throws IgniteCheckedException If check failed.
-     */
-    private void checkMemoryConfiguration(ClusterNode rmt) throws IgniteCheckedException {
-        ClusterNode locNode = ctx.discovery().localNode();
-
-        if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
-            return;
-
-        DataStorageConfiguration dsCfg = null;
-
-        Object dsCfgBytes = rmt.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
-
-        if (dsCfgBytes instanceof byte[])
-            dsCfg = new JdkMarshaller().unmarshal((byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
-
-        if (dsCfg == null) {
-            // Try to use legacy memory configuration.
-            MemoryConfiguration memCfg = rmt.attribute(IgniteNodeAttributes.ATTR_MEMORY_CONFIG);
-
-            if (memCfg != null) {
-                dsCfg = new DataStorageConfiguration();
-
-                // All properties that are used in validation should be converted here.
-                dsCfg.setPageSize(memCfg.getPageSize());
-            }
-        }
-
-        if (dsCfg != null) {
-            DataStorageConfiguration locDsCfg = ctx.config().getDataStorageConfiguration();
-
-            if (dsCfg.getPageSize() != locDsCfg.getPageSize()) {
-                throw new IgniteCheckedException("Memory configuration mismatch (fix configuration or set -D" +
-                    IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property) [rmtNodeId=" + rmt.id() +
-                    ", locPageSize = " + locDsCfg.getPageSize() + ", rmtPageSize = " + dsCfg.getPageSize() + "]");
-            }
-        }
-    }
-
-    /**
-     * @param rmt Remote node to check.
-     * @throws IgniteCheckedException If check failed.
-     */
-    private void checkRebalanceConfiguration(ClusterNode rmt) throws IgniteCheckedException {
-        ClusterNode locNode = ctx.discovery().localNode();
-
-        if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
-            return;
-
-        Integer rebalanceThreadPoolSize = rmt.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE);
-
-        if (rebalanceThreadPoolSize != null && rebalanceThreadPoolSize != ctx.config().getRebalanceThreadPoolSize()) {
-            throw new IgniteCheckedException("Rebalance configuration mismatch (fix configuration or set -D" +
-                IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)." +
-                " Different values of such parameter may lead to rebalance process instability and hanging. " +
-                " [rmtNodeId=" + rmt.id() +
-                ", locRebalanceThreadPoolSize = " + ctx.config().getRebalanceThreadPoolSize() +
-                ", rmtRebalanceThreadPoolSize = " + rebalanceThreadPoolSize + "]");
-        }
-    }
-
-    /**
      * @param cfg Cache configuration.
      * @return Query manager.
      */
@@ -5072,23 +4284,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return All configured cache instances.
      */
     public Collection<IgniteInternalCache<?, ?>> caches() {
-        return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?, ?>,
-            IgniteInternalCache<?, ?>>() {
-            @Override public IgniteInternalCache<?, ?> apply(IgniteCacheProxy<?, ?> entries) {
-                return entries.internalProxy();
-            }
-        });
+        return F.viewReadOnly(jCacheProxies.values(),
+            (IgniteClosure<IgniteCacheProxy<?, ?>, IgniteInternalCache<?, ?>>)IgniteCacheProxy::internalProxy);
     }
 
     /**
      * @return All configured cache instances.
      */
     public Collection<IgniteCacheProxy<?, ?>> jcaches() {
-        return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>() {
-            @Override public IgniteCacheProxy<?, ?> apply(IgniteCacheProxyImpl<?, ?> proxy) {
-                return proxy.gatewayWrapper();
-            }
-        });
+        return F.viewReadOnly(jCacheProxies.values(),
+            (IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>)IgniteCacheProxyImpl::gatewayWrapper);
     }
 
     /**
@@ -5178,7 +4383,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    public @Nullable <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
         boolean failIfNotStarted,
         boolean checkThreadTx) throws IgniteCheckedException {
         assert cacheName != null;
@@ -5296,7 +4502,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheId Cache ID.
      * @return Cache descriptor.
      */
-    @Nullable public DynamicCacheDescriptor cacheDescriptor(int cacheId) {
+    public @Nullable DynamicCacheDescriptor cacheDescriptor(int cacheId) {
         for (DynamicCacheDescriptor cacheDesc : cacheDescriptors().values()) {
             CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
 
@@ -5323,7 +4529,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (desc != null)
             return;
 
-        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg);
+        DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg,
+            backwardCompatibleSplitter().split(cacheCfg));
 
         TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());
 
@@ -5393,7 +4600,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param awaitInit Await proxy initialization.
      * @return Cache proxy.
      */
-    @Nullable public IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
+    public @Nullable IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
         IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name);
 
         if (awaitInit)
@@ -5407,7 +4614,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param proxy Cache proxy.
      * @return Previous cache proxy.
      */
-    @Nullable public IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
+    public @Nullable IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
         return jCacheProxies.putIfAbsent(name, proxy);
     }
 
@@ -5666,7 +4873,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Configuration copy.
      * @throws IgniteCheckedException If validation failed.
      */
-    private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
+    CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
         if (val == null)
             return null;
 
@@ -6015,7 +5222,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param oldFormat Old format.
      */
-    public CacheConfigurationSplitter splitter(boolean oldFormat) {
+    private CacheConfigurationSplitter splitter(boolean oldFormat) {
         // Requesting splitter with old format support is rare operation.
         // It's acceptable to allocate it every time by request.
         return oldFormat ? new CacheConfigurationSplitterOldFormat(enricher) : splitter;
@@ -6034,7 +5241,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @return Cache configuration splitter with or without old format support depending on cluster state.
      */
-    public CacheConfigurationSplitter backwardCompatibleSplitter() {
+    private CacheConfigurationSplitter backwardCompatibleSplitter() {
         IgniteDiscoverySpi spi = (IgniteDiscoverySpi) ctx.discovery().getInjectedDiscoverySpi();
 
         boolean oldFormat = !spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
@@ -6072,7 +5279,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
-            restoreCacheConfigurations();
+            CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
+
+            cachesInfo.onStart(data);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
new file mode 100644
index 0000000..c59b3fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
+
+/**
+ * Responsible for restoring local cache configurations (both from static configuration and persistence).
+ * Keep stop sequence of caches and caches which were presented on node before node join.
+ */
+public class GridLocalConfigManager {
+    /** */
+    private final boolean startClientCaches =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
+
+    /** Caches stop sequence. */
+    private final Deque<String> stopSeq = new LinkedList<>();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Node's local caches on start (both from static configuration and from persistent caches). */
+    private Set<String> localCachesOnStart;
+
+    /** Cache processor. */
+    private final GridCacheProcessor cacheProcessor;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /**
+     * @param cacheProcessor Cache processor.
+     * @param kernalCtx Kernal context.
+     */
+    public GridLocalConfigManager(
+        GridCacheProcessor cacheProcessor,
+        GridKernalContext kernalCtx
+    ) {
+        this.cacheProcessor = cacheProcessor;
+        ctx = kernalCtx;
+        log = ctx.log(getClass());
+    }
+
+    /**
+     * Save cache configuration to persistent store if necessary.
+     *
+     * @param storedCacheData Stored cache data.
+     * @param overwrite Overwrite existing.
+     */
+    public void saveCacheConfiguration(StoredCacheData storedCacheData, boolean overwrite) throws IgniteCheckedException {
+        assert storedCacheData != null;
+
+        GridCacheSharedContext<Object, Object> sharedContext = cacheProcessor.context();
+
+        if (sharedContext.pageStore() != null
+            && !sharedContext.kernalContext().clientNode()
+            && isPersistentCache(storedCacheData.config(), sharedContext.gridConfig().getDataStorageConfiguration()))
+            sharedContext.pageStore().storeCacheData(storedCacheData, overwrite);
+    }
+
+    /**
+     *
+     */
+    public Collection<String> stopSequence() {
+        return stopSeq;
+    }
+
+    /**
+     * @return Caches to be started when this node starts.
+     */
+    public Set<String> localCachesOnStart() {
+        return localCachesOnStart;
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public CacheJoinNodeDiscoveryData restoreCacheConfigurations() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return null;
+
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
+
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
+
+        restoreCaches(caches, templates, ctx.config(), ctx.cache().context().pageStore());
+
+        CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
+            IgniteUuid.randomUuid(),
+            caches,
+            templates,
+            startAllCachesOnClientStart()
+        );
+
+        localCachesOnStart = new HashSet<>(discoData.caches().keySet());
+
+        return discoData;
+    }
+
+    /**
+     * @return {@code True} if need locally start all existing caches on client node start.
+     */
+    private boolean startAllCachesOnClientStart() {
+        return startClientCaches && ctx.clientNode();
+    }
+
+    /**
+     * @param caches Caches accumulator.
+     * @param templates Templates accumulator.
+     * @param config Ignite configuration.
+     * @param pageStoreManager Page store manager.
+     */
+    private void restoreCaches(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
+        IgniteConfiguration config,
+        IgnitePageStoreManager pageStoreManager
+    ) throws IgniteCheckedException {
+        assert !config.isDaemon() : "Trying to restore cache configurations on daemon node.";
+
+        CacheConfiguration[] cfgs = config.getCacheConfiguration();
+
+        for (int i = 0; i < cfgs.length; i++) {
+            CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
+
+            // Replace original configuration value.
+            cfgs[i] = cfg;
+
+            addCacheFromConfiguration(cfg, false, caches, templates);
+        }
+
+        if (CU.isPersistenceEnabled(config) && pageStoreManager != null) {
+            Map<String, StoredCacheData> storedCaches = pageStoreManager.readCacheConfigurations();
+
+            if (!F.isEmpty(storedCaches)) {
+                List<String> skippedConfigs = new ArrayList<>();
+
+                for (StoredCacheData storedCacheData : storedCaches.values()) {
+                    // Backward compatibility for old stored caches data.
+                    if (storedCacheData.hasOldCacheConfigurationFormat()) {
+                        storedCacheData = new StoredCacheData(storedCacheData);
+
+                        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg =
+                            cacheProcessor.splitter().split(storedCacheData.config());
+
+                        storedCacheData.config(splitCfg.get1());
+                        storedCacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+                        // Overwrite with new format.
+                        saveCacheConfiguration(storedCacheData, true);
+                    }
+
+                    String cacheName = storedCacheData.config().getName();
+
+                    CacheType type = ctx.cache().cacheType(cacheName);
+
+                    if (!caches.containsKey(cacheName))
+                        // No static cache - add the configuration.
+                        addStoredCache(caches, storedCacheData, cacheName, type, true, false);
+                    else {
+                        // A static cache with the same name already exists.
+                        CacheConfiguration cfg = caches.get(cacheName).cacheData().config();
+                        CacheConfiguration cfgFromStore = storedCacheData.config();
+
+                        validateCacheConfigurationOnRestore(cfg, cfgFromStore);
+
+                        addStoredCache(caches, storedCacheData, cacheName, type, true,
+                            cacheProcessor.keepStaticCacheConfiguration());
+
+                        if (!cacheProcessor.keepStaticCacheConfiguration() && type == CacheType.USER)
+                            skippedConfigs.add(cacheName);
+
+                    }
+                }
+
+                if (!F.isEmpty(skippedConfigs)) {
+                    U.warn(log, "Static configuration for the following caches will be ignored because a persistent " +
+                        "cache with the same name already exist (see " +
+                        "https://apacheignite.readme.io/docs/cache-configuration for more information): " +
+                        skippedConfigs);
+                }
+            }
+        }
+    }
+
+    /**
+     * Add stored cache data to caches storage.
+     *
+     * @param caches Cache storage.
+     * @param cacheData Cache data to add.
+     * @param cacheName Cache name.
+     * @param cacheType Cache type.
+     * @param isStaticallyConfigured Statically configured flag.
+     */
+    private void addStoredCache(
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        StoredCacheData cacheData,
+        String cacheName,
+        CacheType cacheType,
+        boolean persistedBefore,
+        boolean isStaticallyConfigured
+    ) {
+        if (!caches.containsKey(cacheName)) {
+            if (!cacheType.userCache())
+                stopSeq.addLast(cacheName);
+            else
+                stopSeq.addFirst(cacheName);
+        }
+
+        caches.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, cacheType, cacheData.sql(),
+            persistedBefore ? 1 : 0, isStaticallyConfigured));
+    }
+
+    /**
+     * @param cfg Cache configuration.
+     * @param sql SQL flag.
+     * @param caches Caches map.
+     * @param templates Templates map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void addCacheFromConfiguration(
+        CacheConfiguration<?, ?> cfg,
+        boolean sql,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+        Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+    ) throws IgniteCheckedException {
+        String cacheName = cfg.getName();
+
+        CU.validateCacheName(cacheName);
+
+        cacheProcessor.cloneCheckSerializable(cfg);
+
+        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+
+        // Initialize defaults.
+        cacheProcessor.initialize(cfg, cacheObjCtx);
+
+        StoredCacheData cacheData = new StoredCacheData(cfg);
+
+        cacheData.sql(sql);
+
+        T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cacheProcessor.splitter().split(cfg);
+
+        cacheData.config(splitCfg.get1());
+        cacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+        cfg = splitCfg.get1();
+
+        if (GridCacheUtils.isCacheTemplateName(cacheName))
+            templates.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, CacheType.USER, false, 0, true));
+        else {
+            if (caches.containsKey(cacheName)) {
+                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+                    "assign unique name to each cache): " + cacheName);
+            }
+
+            CacheType cacheType = ctx.cache().cacheType(cacheName);
+
+            if (cacheType != CacheType.USER && cfg.getDataRegionName() == null)
+                cfg.setDataRegionName(cacheProcessor.context().database().systemDateRegionName());
+
+            addStoredCache(caches, cacheData, cacheName, cacheType, false, true);
+        }
+    }
+
+    /**
+     * Validates cache configuration against stored cache configuration when persistence is enabled.
+     *
+     * @param cfg Configured cache configuration.
+     * @param cfgFromStore Stored cache configuration
+     * @throws IgniteCheckedException If validation failed.
+     */
+    private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, CacheConfiguration cfgFromStore)
+        throws IgniteCheckedException {
+        assert cfg != null && cfgFromStore != null;
+
+        if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT ||
+            cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+            && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) {
+            throw new IgniteCheckedException("Cannot start cache. Statically configured atomicity mode differs from " +
+                "previously stored configuration. Please check your configuration: [cacheName=" + cfg.getName() +
+                ", configuredAtomicityMode=" + cfg.getAtomicityMode() +
+                ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + "]");
+        }
+
+        boolean staticCfgVal = cfg.isEncryptionEnabled();
+
+        boolean storedVal = cfgFromStore.isEncryptionEnabled();
+
+        if (storedVal != staticCfgVal) {
+            throw new IgniteCheckedException("Encrypted flag value differs. Static config value is '" + staticCfgVal +
+                "' and value stored on the disk is '" + storedVal + "'");
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
new file mode 100644
index 0000000..f275f28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.expiry.EternalExpiryPolicy;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
+import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent;
+import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext;
+
+/**
+ * Util class for joining node validation.
+ */
+public class ValidationOnNodeJoinUtils {
+    /** Template of message of conflicts during configuration merge */
+    private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
+        "Conflicts during configuration merge for cache '%s' : \n%s";
+
+    /** Template of message of node join was fail because it requires to merge of config */
+    private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
+        "(the config of the cache '%s' has to be merged which is impossible on active grid). " +
+        "Deactivate grid and retry node join or clean the joining node.";
+
+    /** Template of message of failed node join because encryption settings are different for the same cache. */
+    private static final String ENCRYPT_MISMATCH_MESSAGE = "Failed to join node to the cluster " +
+        "(encryption settings are different for cache '%s' : local=%s, remote=%s.)";
+
+    /** Supports non default precision and scale for DECIMAL and VARCHAR types. */
+    private static final IgniteProductVersion PRECISION_SCALE_SINCE_VER = IgniteProductVersion.fromString("2.7.0");
+
+    /** Invalid region configuration message. */
+    private static final String INVALID_REGION_CONFIGURATION_MESSAGE = "Failed to join node " +
+        "(Incompatible data region configuration [region=%s, locNodeId=%s, isPersistenceEnabled=%s, rmtNodeId=%s, isPersistenceEnabled=%s])";
+
+    /**
+     * Checks a joining node to configuration consistency.
+     *
+     * @param node Node.
+     * @param discoData Disco data.
+     * @param marsh Marsh.
+     * @param ctx Context.
+     * @param cacheDescProvider Cache descriptor provider.
+     */
+    @Nullable static IgniteNodeValidationResult validateNode(
+        ClusterNode node,
+        DiscoveryDataBag.JoiningNodeDiscoveryData discoData,
+        Marshaller marsh,
+        GridKernalContext ctx,
+        Function<String, DynamicCacheDescriptor> cacheDescProvider
+    ) {
+        if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
+            CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
+
+            boolean isGridActive = ctx.state().clusterState().active();
+
+            StringBuilder errorMsg = new StringBuilder();
+
+            if (!node.isClient()) {
+                validateRmtRegions(node, ctx).forEach(error -> {
+                    if (errorMsg.length() > 0)
+                        errorMsg.append("\n");
+
+                    errorMsg.append(error);
+                });
+            }
+
+            SecurityContext secCtx = null;
+
+            if (ctx.security().enabled()) {
+                try {
+                    secCtx = nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node);
+                }
+                catch (SecurityException se) {
+                    errorMsg.append(se.getMessage());
+                }
+            }
+
+            for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) {
+                if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) {
+                    try (OperationSecurityContext s = ctx.security().withContext(secCtx)) {
+                        GridCacheProcessor.authorizeCacheCreate(cacheInfo.cacheData().config(), ctx);
+                    }
+                    catch (SecurityException ex) {
+                        if (errorMsg.length() > 0)
+                            errorMsg.append("\n");
+
+                        errorMsg.append(ex.getMessage());
+                    }
+                }
+
+                DynamicCacheDescriptor locDesc = cacheDescProvider.apply(cacheInfo.cacheData().config().getName());
+
+                if (locDesc == null)
+                    continue;
+
+                QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
+
+                if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
+                    if (errorMsg.length() > 0)
+                        errorMsg.append("\n");
+
+                    if (schemaPatch.hasConflicts())
+                        errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
+                            locDesc.cacheName(), schemaPatch.getConflictsMessage()));
+                    else
+                        errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
+                }
+
+                // This check must be done on join, otherwise group encryption key will be
+                // written to metastore regardless of validation check and could trigger WAL write failures.
+                boolean locEnc = locDesc.cacheConfiguration().isEncryptionEnabled();
+                boolean rmtEnc = cacheInfo.cacheData().config().isEncryptionEnabled();
+
+                if (locEnc != rmtEnc) {
+                    if (errorMsg.length() > 0)
+                        errorMsg.append("\n");
+
+                    // Message will be printed on remote node, so need to swap local and remote.
+                    errorMsg.append(String.format(ENCRYPT_MISMATCH_MESSAGE, locDesc.cacheName(), rmtEnc, locEnc));
+                }
+            }
+
+            if (errorMsg.length() > 0) {
+                String msg = errorMsg.toString();
+
+                return new IgniteNodeValidationResult(node.id(), msg);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param c Ignite configuration.
+     * @param cc Configuration to validate.
+     * @param cacheType Cache type.
+     * @param cfgStore Cache store.
+     * @param ctx Context.
+     * @param log Logger.
+     * @throws IgniteCheckedException If failed.
+     */
+    static void validate(
+        IgniteConfiguration c,
+        CacheConfiguration cc,
+        CacheType cacheType,
+        @Nullable CacheStore cfgStore,
+        GridKernalContext ctx,
+        IgniteLogger log,
+        BiFunction<Boolean, String, IgniteCheckedException> assertParam
+    ) throws IgniteCheckedException {
+        apply(assertParam, cc.getName() != null && !cc.getName().isEmpty(), "name is null or empty");
+
+        if (cc.getCacheMode() == REPLICATED) {
+            if (cc.getNearConfiguration() != null &&
+                ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
+                U.warn(log, "Near cache cannot be used with REPLICATED cache, " +
+                    "will be ignored [cacheName=" + U.maskName(cc.getName()) + ']');
+
+                cc.setNearConfiguration(null);
+            }
+        }
+
+        if (storesLocallyOnClient(c, cc, ctx))
+            throw new IgniteCheckedException("DataRegion for client caches must be explicitly configured " +
+                "on client node startup. Use DataStorageConfiguration to configure DataRegion.");
+
+        if (cc.getCacheMode() == LOCAL && !cc.getAffinity().getClass().equals(GridCacheProcessor.LocalAffinityFunction.class))
+            U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
+                U.maskName(cc.getName()) + ']');
+
+        if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+            throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
+                " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
+
+        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
+            apply(assertParam, cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
+            apply(assertParam, cc.getRebalanceTimeout() >= 0, "rebalanceTimeout >= 0");
+            apply(assertParam, cc.getRebalanceThrottle() >= 0, "rebalanceThrottle >= 0");
+            apply(assertParam, cc.getRebalanceBatchesPrefetchCount() > 0, "rebalanceBatchesPrefetchCount > 0");
+        }
+
+        if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
+            if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
+                U.warn(log, "Cache write synchronization mode is set to FULL_ASYNC. All single-key 'put' and " +
+                    "'remove' operations will return 'null', all 'putx' and 'removex' operations will return" +
+                    " 'true' [cacheName=" + U.maskName(cc.getName()) + ']');
+        }
+
+        DeploymentMode depMode = c.getDeploymentMode();
+
+        if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
+            !CU.isSystemCache(cc.getName()) && !(c.getMarshaller() instanceof BinaryMarshaller))
+            throw new IgniteCheckedException("Cache can be started in PRIVATE or ISOLATED deployment mode only when" +
+                " BinaryMarshaller is used [depMode=" + ctx.config().getDeploymentMode() + ", marshaller=" +
+                c.getMarshaller().getClass().getName() + ']');
+
+        if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+            throw new IgniteCheckedException("Affinity function must return at most " +
+                CacheConfiguration.MAX_PARTITIONS_COUNT + " partitions [actual=" + cc.getAffinity().partitions() +
+                ", affFunction=" + cc.getAffinity() + ", cacheName=" + cc.getName() + ']');
+
+        if (cc.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
+            apply(assertParam, cc.getCacheMode() != LOCAL,
+                "LOCAL cache mode cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            apply(assertParam, cc.getNearConfiguration() == null,
+                "near cache cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            apply(assertParam, !cc.isReadThrough(),
+                "readThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            apply(assertParam, !cc.isWriteThrough(),
+                "writeThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            apply(assertParam, !cc.isWriteBehindEnabled(),
+                "writeBehindEnabled cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            apply(assertParam, cc.getRebalanceMode() != NONE,
+                "Rebalance mode NONE cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            ExpiryPolicy expPlc = null;
+
+            if (cc.getExpiryPolicyFactory() instanceof FactoryBuilder.SingletonFactory)
+                expPlc = (ExpiryPolicy)cc.getExpiryPolicyFactory().create();
+
+            if (!(expPlc instanceof EternalExpiryPolicy)) {
+                apply(assertParam, cc.getExpiryPolicyFactory() == null,
+                    "expiry policy cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+            }
+
+            apply(assertParam, cc.getInterceptor() == null,
+                "interceptor cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+            // Disable in-memory evictions for mvcc cache. TODO IGNITE-10738
+            String memPlcName = cc.getDataRegionName();
+            DataRegion dataRegion = ctx.cache().context().database().dataRegion(memPlcName);
+
+            if (dataRegion != null && !dataRegion.config().isPersistenceEnabled() &&
+                dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
+                throw new IgniteCheckedException("Data pages evictions cannot be used with TRANSACTIONAL_SNAPSHOT " +
+                    "cache atomicity mode for in-memory regions. Please, either disable evictions or enable " +
+                    "persistence for data regions with TRANSACTIONAL_SNAPSHOT caches. [cacheName=" + cc.getName() +
+                    ", dataRegionName=" + memPlcName + ", pageEvictionMode=" +
+                    dataRegion.config().getPageEvictionMode() + ']');
+            }
+
+            IndexingSpi idxSpi = ctx.config().getIndexingSpi();
+
+            apply(assertParam, idxSpi == null || idxSpi instanceof NoopIndexingSpi,
+                "Custom IndexingSpi cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+        }
+
+        if (cc.isWriteBehindEnabled() && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
+            if (cfgStore == null)
+                throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
+                    "for cache: " + U.maskName(cc.getName()));
+
+            apply(assertParam, cc.getWriteBehindBatchSize() > 0, "writeBehindBatchSize > 0");
+            apply(assertParam, cc.getWriteBehindFlushSize() >= 0, "writeBehindFlushSize >= 0");
+            apply(assertParam, cc.getWriteBehindFlushFrequency() >= 0, "writeBehindFlushFrequency >= 0");
+            apply(assertParam, cc.getWriteBehindFlushThreadCount() > 0, "writeBehindFlushThreadCount > 0");
+
+            if (cc.getWriteBehindFlushSize() == 0 && cc.getWriteBehindFlushFrequency() == 0)
+                throw new IgniteCheckedException("Cannot set both 'writeBehindFlushFrequency' and " +
+                    "'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName()));
+        }
+
+        if (cc.isReadThrough() && cfgStore == null
+            && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
+            throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " +
+                "for cache: " + U.maskName(cc.getName()));
+
+        if (cc.isWriteThrough() && cfgStore == null
+            && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
+            throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " +
+                "for cache: " + U.maskName(cc.getName()));
+
+        long delay = cc.getRebalanceDelay();
+
+        if (delay != 0) {
+            if (cc.getCacheMode() != PARTITIONED)
+                U.warn(log, "Rebalance delay is supported only for partitioned caches (will ignore): " + (cc.getName()));
+            else if (cc.getRebalanceMode() == SYNC) {
+                if (delay < 0) {
+                    U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start (node will not wait for " +
+                        "rebalancing to be finished): " + U.maskName(cc.getName()));
+                }
+                else {
+                    U.warn(log, "Using SYNC rebalance mode with rebalance delay (node will wait until rebalancing is " +
+                        "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()));
+                }
+            }
+        }
+
+        ctx.coordinators().validateCacheConfiguration(cc);
+
+        if (cc.getAtomicityMode() == ATOMIC)
+            apply(assertParam, cc.getTransactionManagerLookupClassName() == null,
+                "transaction manager can not be used with ATOMIC cache");
+
+        if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null) && !cc.isOnheapCacheEnabled())
+            throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName="
+                + U.maskName(cc.getName()) + "]");
+
+        if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isDataStructureCache(cc.getName()))
+            throw new IgniteCheckedException("Using cache names reserved for datastructures is not allowed for " +
+                "other cache types [cacheName=" + cc.getName() + ", cacheType=" + cacheType + "]");
+
+        if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isReservedGroup(cc.getGroupName()))
+            throw new IgniteCheckedException("Using cache group names reserved for datastructures is not allowed for " +
+                "other cache types [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+                ", cacheType=" + cacheType + "]");
+
+        // Make sure we do not use sql schema for system views.
+        if (ctx.query().moduleEnabled()) {
+            String schema = QueryUtils.normalizeSchemaName(cc.getName(), cc.getSqlSchema());
+
+            if (F.eq(schema, QueryUtils.SCHEMA_SYS)) {
+                if (cc.getSqlSchema() == null) {
+                    // Conflict on cache name.
+                    throw new IgniteCheckedException("SQL schema name derived from cache name is reserved (" +
+                        "please set explicit SQL schema name through CacheConfiguration.setSqlSchema() or choose " +
+                        "another cache name) [cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + "]");
+                }
+                else {
+                    // Conflict on schema name.
+                    throw new IgniteCheckedException("SQL schema name is reserved (please choose another one) [" +
+                        "cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + ']');
+                }
+            }
+        }
+
+        if (cc.isEncryptionEnabled() && !ctx.clientNode()) {
+            if (!CU.isPersistentCache(cc, c.getDataStorageConfiguration())) {
+                throw new IgniteCheckedException("Using encryption is not allowed" +
+                    " for not persistent cache  [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+                    ", cacheType=" + cacheType + "]");
+            }
+
+            EncryptionSpi encSpi = c.getEncryptionSpi();
+
+            if (encSpi == null) {
+                throw new IgniteCheckedException("EncryptionSpi should be configured to use encrypted cache " +
+                    "[cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+                    ", cacheType=" + cacheType + "]");
+            }
+        }
+
+        Collection<QueryEntity> ents = cc.getQueryEntities();
+
+        if (ctx.discovery().discoCache() != null) {
+            boolean nonDfltPrecScaleExists = ents.stream().anyMatch(
+                e -> !F.isEmpty(e.getFieldsPrecision()) || !F.isEmpty(e.getFieldsScale()));
+
+            if (nonDfltPrecScaleExists) {
+                ClusterNode oldestNode = ctx.discovery().discoCache().oldestServerNode();
+
+                if (PRECISION_SCALE_SINCE_VER.compareTo(oldestNode.version()) > 0) {
+                    throw new IgniteCheckedException("Non default precision and scale is supported since version 2.7. " +
+                        "The node with oldest version [node=" + oldestNode + ']');
+                }
+            }
+        }
+    }
+
+    /**
+     * @param ctx Context.
+     * @param log Logger.
+     * @throws IgniteCheckedException if check failed.
+     */
+    static void checkConsistency(GridKernalContext ctx, IgniteLogger log) throws IgniteCheckedException {
+        Collection<ClusterNode> rmtNodes = ctx.discovery().remoteNodes();
+
+        boolean changeablePoolSize =
+            IgniteFeatures.allNodesSupports(rmtNodes, IgniteFeatures.DIFFERENT_REBALANCE_POOL_SIZE);
+
+        for (ClusterNode n : rmtNodes) {
+            if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
+                continue;
+
+            if (!changeablePoolSize)
+                checkRebalanceConfiguration(n, ctx);
+
+            checkTransactionConfiguration(n, ctx, log);
+
+            checkMemoryConfiguration(n, ctx);
+
+            DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+            DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+
+            CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+                locDepMode, rmtDepMode, true);
+        }
+    }
+
+    /**
+     * @param rmtNode Joining node.
+     * @param ctx Context
+     * @return List of validation errors.
+     */
+    private static List<String> validateRmtRegions(ClusterNode rmtNode, GridKernalContext ctx) {
+        List<String> errorMessages = new ArrayList<>();
+
+        DataStorageConfiguration rmtStorageCfg = extractDataStorage(rmtNode, ctx);
+        Map<String, DataRegionConfiguration> rmtRegionCfgs = dataRegionCfgs(rmtStorageCfg);
+
+        DataStorageConfiguration locStorageCfg = ctx.config().getDataStorageConfiguration();
+
+        if (isDefaultDataRegionPersistent(locStorageCfg) != isDefaultDataRegionPersistent(rmtStorageCfg)) {
+            errorMessages.add(String.format(
+                INVALID_REGION_CONFIGURATION_MESSAGE,
+                "DEFAULT",
+                ctx.localNodeId(),
+                isDefaultDataRegionPersistent(locStorageCfg),
+                rmtNode.id(),
+                isDefaultDataRegionPersistent(rmtStorageCfg)
+            ));
+        }
+
+        for (ClusterNode clusterNode : ctx.discovery().aliveServerNodes()) {
+            Map<String, DataRegionConfiguration> nodeRegionCfg = dataRegionCfgs(extractDataStorage(clusterNode, ctx));
+
+            for (Map.Entry<String, DataRegionConfiguration> nodeRegionCfgEntry : nodeRegionCfg.entrySet()) {
+                String regionName = nodeRegionCfgEntry.getKey();
+
+                DataRegionConfiguration rmtRegionCfg = rmtRegionCfgs.get(regionName);
+
+                if (rmtRegionCfg != null && rmtRegionCfg.isPersistenceEnabled() != nodeRegionCfgEntry.getValue().isPersistenceEnabled())
+                    errorMessages.add(String.format(
+                        INVALID_REGION_CONFIGURATION_MESSAGE,
+                        regionName,
+                        ctx.localNodeId(),
+                        nodeRegionCfgEntry.getValue().isPersistenceEnabled(),
+                        rmtNode.id(),
+                        rmtRegionCfg.isPersistenceEnabled()
+                    ));
+            }
+        }
+
+        return errorMessages;
+    }
+
+    /**
+     * @param assertParam Assert parameter.
+     * @param cond The condition result.
+     * @param condDesc The description of condition.
+     */
+    private static void apply(
+        BiFunction<Boolean, String, IgniteCheckedException> assertParam,
+        Boolean cond,
+        String condDesc
+    ) throws IgniteCheckedException {
+        IgniteCheckedException apply = assertParam.apply(cond, condDesc);
+
+        if (apply != null)
+            throw apply;
+    }
+
+    /**
+     * @param c Ignite Configuration.
+     * @param cc Cache Configuration.
+     * @param ctx Context.
+     * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
+     */
+    private static boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc, GridKernalContext ctx) {
+        if (c.isClientMode() && c.getDataStorageConfiguration() == null) {
+            if (cc.getCacheMode() == LOCAL)
+                return true;
+
+            return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
+
+        }
+        else
+            return false;
+    }
+
+    /**
+     * @param rmt Remote node to check.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private static void checkRebalanceConfiguration(
+        ClusterNode rmt,
+        GridKernalContext ctx
+    ) throws IgniteCheckedException {
+        ClusterNode locNode = ctx.discovery().localNode();
+
+        if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
+            return;
+
+        Integer rebalanceThreadPoolSize = rmt.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE);
+
+        if (rebalanceThreadPoolSize != null && rebalanceThreadPoolSize != ctx.config().getRebalanceThreadPoolSize()) {
+            throw new IgniteCheckedException("Rebalance configuration mismatch (fix configuration or set -D" +
+                IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)." +
+                " Different values of such parameter may lead to rebalance process instability and hanging. " +
+                " [rmtNodeId=" + rmt.id() +
+                ", locRebalanceThreadPoolSize = " + ctx.config().getRebalanceThreadPoolSize() +
+                ", rmtRebalanceThreadPoolSize = " + rebalanceThreadPoolSize + "]");
+        }
+    }
+
+    /**
+     * @param rmt Remote node to check.
+     * @param ctx Context.
+     * @param log Logger.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private static void checkTransactionConfiguration(
+        ClusterNode rmt,
+        GridKernalContext ctx,
+        IgniteLogger log
+    ) throws IgniteCheckedException {
+        TransactionConfiguration rmtTxCfg = rmt.attribute(ATTR_TX_CONFIG);
+
+        if (rmtTxCfg != null) {
+            TransactionConfiguration locTxCfg = ctx.config().getTransactionConfiguration();
+
+            checkDeadlockDetectionConfig(rmt, rmtTxCfg, locTxCfg, log);
+
+            checkSerializableEnabledConfig(rmt, rmtTxCfg, locTxCfg);
+        }
+    }
+
+    /**
+     *
+     */
+    private static void checkDeadlockDetectionConfig(
+        ClusterNode rmt,
+        TransactionConfiguration rmtTxCfg,
+        TransactionConfiguration locTxCfg,
+        IgniteLogger log
+    ) {
+        boolean locDeadlockDetectionEnabled = locTxCfg.getDeadlockTimeout() > 0;
+        boolean rmtDeadlockDetectionEnabled = rmtTxCfg.getDeadlockTimeout() > 0;
+
+        if (locDeadlockDetectionEnabled != rmtDeadlockDetectionEnabled) {
+            U.warn(log, "Deadlock detection is enabled on one node and disabled on another. " +
+                "Disabled detection on one node can lead to undetected deadlocks. [rmtNodeId=" + rmt.id() +
+                ", locDeadlockTimeout=" + locTxCfg.getDeadlockTimeout() +
+                ", rmtDeadlockTimeout=" + rmtTxCfg.getDeadlockTimeout());
+        }
+    }
+
+    /**
+     *
+     */
+    private static void checkSerializableEnabledConfig(
+        ClusterNode rmt,
+        TransactionConfiguration rmtTxCfg,
+        TransactionConfiguration locTxCfg
+    ) throws IgniteCheckedException {
+        if (locTxCfg.isTxSerializableEnabled() != rmtTxCfg.isTxSerializableEnabled())
+            throw new IgniteCheckedException("Serializable transactions enabled mismatch " +
+                "(fix txSerializableEnabled property or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " +
+                "system property) [rmtNodeId=" + rmt.id() +
+                ", locTxSerializableEnabled=" + locTxCfg.isTxSerializableEnabled() +
+                ", rmtTxSerializableEnabled=" + rmtTxCfg.isTxSerializableEnabled() + ']');
+    }
+
+    /**
+     * @param rmt Remote node to check.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext ctx) throws IgniteCheckedException {
+        ClusterNode locNode = ctx.discovery().localNode();
+
+        if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
+            return;
+
+        DataStorageConfiguration dsCfg = null;
+
+        Object dsCfgBytes = rmt.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
+
+        if (dsCfgBytes instanceof byte[])
+            dsCfg = new JdkMarshaller().unmarshal((byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
+
+        if (dsCfg == null) {
+            // Try to use legacy memory configuration.
+            MemoryConfiguration memCfg = rmt.attribute(IgniteNodeAttributes.ATTR_MEMORY_CONFIG);
+
+            if (memCfg != null) {
+                dsCfg = new DataStorageConfiguration();
+
+                // All properties that are used in validation should be converted here.
+                dsCfg.setPageSize(memCfg.getPageSize());
+            }
+        }
+
+        if (dsCfg != null) {
+            DataStorageConfiguration locDsCfg = ctx.config().getDataStorageConfiguration();
+
+            if (dsCfg.getPageSize() != locDsCfg.getPageSize()) {
+                throw new IgniteCheckedException("Memory configuration mismatch (fix configuration or set -D" +
+                    IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property) [rmtNodeId=" + rmt.id() +
+                    ", locPageSize = " + locDsCfg.getPageSize() + ", rmtPageSize = " + dsCfg.getPageSize() + "]");
+            }
+        }
+    }
+
+    /**
+     * @param node Joining node.
+     * @param ctx Context.
+     * @param map Cache descriptors.
+     * @return Validation result or {@code null} in case of success.
+     */
+    @Nullable static IgniteNodeValidationResult validateHashIdResolvers(
+        ClusterNode node,
+        GridKernalContext ctx,
+        Map<String, DynamicCacheDescriptor> map
+    ) {
+        if (!node.isClient()) {
+            for (DynamicCacheDescriptor desc : map.values()) {
+                CacheConfiguration cfg = desc.cacheConfiguration();
+
+                if (cfg.getAffinity() instanceof RendezvousAffinityFunction) {
+                    RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity();
+
+                    Object nodeHashObj = aff.resolveNodeHash(node);
+
+                    for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) {
+                        Object topNodeHashObj = aff.resolveNodeHash(topNode);
+
+                        if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) {
+                            String errMsg = "Failed to add node to topology because it has the same hash code for " +
+                                "partitioned affinity as one of existing nodes [cacheName=" +
+                                cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
+
+                            String sndMsg = "Failed to add node to topology because it has the same hash code for " +
+                                "partitioned affinity as one of existing nodes [cacheName=" +
+                                cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
+
+                            return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg);
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param rmtNode Remote node to check.
+     * @param ctx Context.
+     * @return Data storage configuration
+     */
+    private static DataStorageConfiguration extractDataStorage(ClusterNode rmtNode, GridKernalContext ctx) {
+        return GridCacheUtils.extractDataStorage(
+            rmtNode,
+            ctx.marshallerContext().jdkMarshaller(),
+            U.resolveClassLoader(ctx.config())
+        );
+    }
+
+    /**
+     * @param dataStorageCfg User-defined data regions.
+     */
+    private static Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
+        if (dataStorageCfg != null) {
+            return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
+                .map(Stream::of)
+                .orElseGet(Stream::empty)
+                .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
+        }
+
+        return Collections.emptyMap();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index cc573e5..93b2fbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -817,7 +817,11 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
 
         stopAllGrids();
 
-        startGrids(5);
+        startGrid(1); //TODO https://issues.apache.org/jira/browse/IGNITE-8717 (replace with startGrids(5); //after)
+        startGrid(0);
+        startGrid(2);
+        startGrid(3);
+        startGrid(4);
 
         GridTestUtils.waitForCondition(() -> grid(0).cluster().active(), getTestTimeout());
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
index 3c42b30..ed274ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import org.apache.ignite.Ignite;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -61,6 +62,7 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
      *
      * @throws Exception If failed.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8717")
     @Test
     public void testDestroyCachesAbruptly() throws Exception {
         Ignite ignite = startGrids(NODES);
@@ -77,6 +79,7 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
      *
      * @throws Exception If failed.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8717")
     @Test
     public void testDestroyGroupCachesAbruptly() throws Exception {
         Ignite ignite = startGrids(NODES);
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
index 7178feb..06f92f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
@@ -148,8 +148,8 @@ public class GridMarshallerMappingConsistencyTest extends GridCommonAbstractTest
 
         stopAllGrids();
 
-        Ignite g2 = startGrid(2);
         startGrid(1);
+        Ignite g2 = startGrid(2);
 
         assertTrue("Failed to wait for automatic grid activation",
             GridTestUtils.waitForCondition(() -> g2.cluster().active(), getTestTimeout()));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
index 865aee5..833af0e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiFunction;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.QueryIndexType;
@@ -39,8 +40,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.commandline.CommandHandler;
 import org.apache.ignite.internal.commandline.cache.argument.FindAndDeleteGarbageArg;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceJobResult;
 import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceTask;
@@ -48,18 +51,16 @@ import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersist
 import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceTaskResult;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 
 /**
- * Testing corner cases in cache group functionality:
- * -stopping cache in shared group and immediate node leaving;
- * -starting cache in shared group with the same name as destroyed one;
- * -etc.
+ * Testing corner cases in cache group functionality: -stopping cache in shared group and immediate node leaving;
+ * -starting cache in shared group with the same name as destroyed one; -etc.
  */
 @WithSystemProperty(key=IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value="true")
 @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
@@ -67,10 +68,17 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
     /** Group name. */
     public static final String GROUP = "group";
 
+    /**
+     *
+     */
+    private volatile boolean startExtraStaticCache;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration configuration = super.getConfiguration(gridName);
 
+        configuration.setConsistentId(gridName);
+
         configuration.setConnectorConfiguration(new ConnectorConfiguration());
 
         DataStorageConfiguration cfg = new DataStorageConfiguration();
@@ -81,6 +89,9 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
 
         configuration.setDataStorageConfiguration(cfg);
 
+        if (startExtraStaticCache)
+            configuration.setCacheConfiguration(getCacheConfiguration(3));
+
         return configuration;
     }
 
@@ -147,7 +158,7 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
 
         assertNull(ex.cachex(getCacheName(0)));
 
-        IgniteProcessProxy.kill(grid(2).configuration().getIgniteInstanceName());
+        stopGrid(2, true);
 
         startGrid(2);
 
@@ -164,11 +175,88 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     @Test
+    public void testNodeRestartBetweenCacheStop() throws Exception {
+        IgniteEx ex = startGrids(3);
+
+        prepareCachesAndData(ex);
+
+        stopGrid(2, true);
+
+        ex.destroyCache(getCacheName(0));
+
+        assertNull(ex.cachex(getCacheName(0)));
+
+        try {
+            startGrid(2);
+
+            fail();
+        }
+        catch (Exception e) {
+            List<Throwable> list = X.getThrowableList(e);
+
+            assertTrue(list.stream().
+                anyMatch(x -> x.getMessage().
+                    contains("Joining node has caches with data which are not presented on cluster")));
+        }
+
+        removeCacheDir(getTestIgniteInstanceName(2), "cacheGroup-group");
+
+        IgniteEx node2 = startGrid(2);
+
+        assertEquals(3, node2.cluster().nodes().size());
+    }
+
+    /**
+     * @param instanceName Instance name.
+     * @param cacheGroup Cache group.
+     */
+    private void removeCacheDir(String instanceName, String cacheGroup) throws IgniteCheckedException {
+        String dn2DirName = instanceName.replace(".", "_");
+
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(),
+            DFLT_STORE_DIR + "/" + dn2DirName + "/" + cacheGroup, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Ignore("https://ggsystems.atlassian.net/browse/GG-21755")
+    @Test
+    public void testNodeRestartWithNewStaticallyConfiguredCache() throws Exception {
+        IgniteEx ex = startGrids(3);
+
+        prepareCachesAndData(ex);
+
+        stopGrid(2, true);
+
+        assertNull(ex.cachex(getCacheName(3)));
+
+        startExtraStaticCache = true;
+
+        IgniteEx node2;
+        try {
+            node2 = startGrid(2);
+        }
+        finally {
+            startExtraStaticCache = false;
+        }
+
+        assertNotNull(ex.cachex(getCacheName(3)));
+        assertNotNull(node2.cachex(getCacheName(3)));
+
+        IgniteCache<Object, Object> cache = ex.cache(getCacheName(3));
+
+        assertEquals(0, cache.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
     public void testCleaningGarbageAfterCacheDestroyedAndNodeStop() throws Exception {
         testFindAndDeleteGarbage(this::executeTask);
     }
 
-
     /**
      * @throws Exception If failed.
      */
@@ -245,24 +333,24 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
 
     /**
      * @param ignite Ignite to execute task on.
-     * @param deleteFoundGarbage If clearing mode should be used.
+     * @param delFoundGarbage If clearing mode should be used.
      * @return Result of task run.
      */
     private VisorFindAndDeleteGarbageInPersistenceTaskResult executeTaskViaControlConsoleUtil(
         IgniteEx ignite,
-        boolean deleteFoundGarbage
+        boolean delFoundGarbage
     ) {
-        CommandHandler handler = new CommandHandler();
+        CommandHandler hnd = new CommandHandler();
 
         List<String> args = new ArrayList<>(Arrays.asList("--yes", "--port", "11212", "--cache", "find_garbage",
             ignite.localNode().id().toString()));
 
-        if (deleteFoundGarbage)
+        if (delFoundGarbage)
             args.add(FindAndDeleteGarbageArg.DELETE.argName());
 
-        handler.execute(args);
+        hnd.execute(args);
 
-        return handler.getLastOperationResult();
+        return hnd.getLastOperationResult();
     }
 
     /**
@@ -288,7 +376,9 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
      *
      */
     static class Account {
-        /** */
+        /**
+         *
+         */
         private final int val;
 
         /**