You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/22 13:33:25 UTC

ignite git commit: IGNITE-8006 Parallelize cache groups start - Fixes #4752.

Repository: ignite
Updated Branches:
  refs/heads/master e5a467272 -> e1f8f46f9


IGNITE-8006 Parallelize cache groups start - Fixes #4752.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1f8f46f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1f8f46f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1f8f46f

Branch: refs/heads/master
Commit: e1f8f46f90868d377bc764b74d07812150218c71
Parents: e5a4672
Author: Anton Kalashnikov <ka...@yandex.ru>
Authored: Mon Oct 22 16:27:49 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 22 16:27:49 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   7 +
 .../cache/CacheAffinitySharedManager.java       | 115 ++--
 .../processors/cache/GridCacheIoManager.java    |  22 +-
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../processors/cache/GridCacheProcessor.java    | 523 ++++++++++++++-----
 .../processors/cache/StartCacheInfo.java        | 113 ++++
 .../GridDhtPartitionsExchangeFuture.java        |  78 ++-
 .../ignite/internal/util/IgniteUtils.java       | 104 ++--
 .../internal/util/InitializationProtector.java  |  79 +++
 .../util/lang/IgniteThrowableConsumer.java      |  37 ++
 .../util/lang/IgniteThrowableRunner.java        |  30 ++
 .../distributed/CacheStartInParallelTest.java   | 219 ++++++++
 .../IgniteCrossCacheTxStoreSelfTest.java        |  44 +-
 .../internal/util/IgniteUtilsSelfTest.java      |  74 +++
 .../testsuites/IgniteCacheTestSuite7.java       |   3 +
 15 files changed, 1165 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 521222c..6afe244 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1030,6 +1030,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_REBALANCE_THROTTLE_OVERRIDE = "IGNITE_REBALANCE_THROTTLE_OVERRIDE";
 
     /**
+     * Enables start caches in parallel.
+     *
+     * Default is {@code true}.
+     */
+    public static final String IGNITE_ALLOW_START_CACHES_IN_PARALLEL = "IGNITE_ALLOW_START_CACHES_IN_PARALLEL";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index cedbde1..6e10c00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import javax.cache.CacheException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,7 +32,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
@@ -427,25 +428,43 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
 
-        Set<String> startedCaches = U.newHashSet(startDescs.size());
-
         Map<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size());
 
-        for (DynamicCacheDescriptor desc : startDescs) {
-            try {
-                startedCaches.add(desc.cacheName());
+        List<StartCacheInfo> startCacheInfos = startDescs.stream()
+            .map(desc -> {
+                DynamicCacheChangeRequest changeReq = startReqs.get(desc.cacheName());
 
-                DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
-
-                cctx.cache().prepareCacheStart(
-                    desc.cacheConfiguration(),
+                return new StartCacheInfo(
                     desc,
-                    startReq.nearCacheConfiguration(),
+                    changeReq.nearCacheConfiguration(),
                     topVer,
-                    startReq.disabledAfterStart()
+                    changeReq.disabledAfterStart()
                 );
+            })
+            .collect(Collectors.toList());
+
+        Set<String> startedCaches = startCacheInfos.stream()
+            .map(info -> info.getCacheDescriptor().cacheName())
+            .collect(Collectors.toSet());
+
+        try {
+            cctx.cache().prepareStartCaches(startCacheInfos);
+        }
+        catch (IgniteCheckedException e) {
+            cctx.cache().closeCaches(startedCaches, false);
+
+            cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+            return null;
+        }
+
+        for (StartCacheInfo startCacheInfo : startCacheInfos) {
+            try {
+                DynamicCacheDescriptor desc = startCacheInfo.getCacheDescriptor();
+
+                startedCaches.add(desc.cacheName());
 
-                startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
+                startedInfos.put(desc.cacheId(), startCacheInfo.getReqNearCfg() != null);
 
                 CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
@@ -860,6 +879,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         long time = System.currentTimeMillis();
 
+        Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new LinkedHashMap<>();
+
         for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
             DynamicCacheDescriptor cacheDesc = action.descriptor();
 
@@ -895,29 +916,41 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             }
 
-            try {
-                if (startCache) {
-                    cctx.cache().prepareCacheStart(
+            if (startCache) {
+                startCacheInfos.put(
+                    new StartCacheInfo(
                         req.startCacheConfiguration(),
                         cacheDesc,
                         nearCfg,
                         evts.topologyVersion(),
                         req.disabledAfterStart()
-                    );
-
-                    if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
-                        if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
-                            U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
-                    }
-                }
+                    ),
+                    req
+                );
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
-                    "[cacheName=" + req.cacheName() + ']', e);
+        }
+
+        Map<StartCacheInfo, IgniteCheckedException> failedCaches = cctx.cache().prepareStartCachesIfPossible(startCacheInfos.keySet());
 
-                cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false);
+        failedCaches.forEach((cacheInfo, exception) -> {
+            U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
+                "[cacheName=" + cacheInfo.getStartedConfiguration().getName() + ']', exception);
 
-                cctx.cache().completeCacheStartFuture(req, false, e);
+            cctx.cache().closeCaches(Collections.singleton(cacheInfo.getStartedConfiguration().getName()), false);
+
+            cctx.cache().completeCacheStartFuture(startCacheInfos.get(cacheInfo), false, exception);
+        });
+
+        Set<StartCacheInfo> failedCacheInfos = failedCaches.keySet();
+
+        List<StartCacheInfo> cacheInfos = startCacheInfos.keySet().stream()
+            .filter(failedCacheInfos::contains)
+            .collect(Collectors.toList());
+
+        for (StartCacheInfo info : cacheInfos) {
+            if (fut.cacheAddedOnExchange(info.getCacheDescriptor().cacheId(), info.getCacheDescriptor().receivedFrom())) {
+                if (fut.events().discoveryCache().cacheGroupAffinityNodes(info.getCacheDescriptor().groupId()).isEmpty())
+                    U.quietAndWarn(log, "No server nodes found for cache client: " + info.getCacheDescriptor().cacheName());
             }
         }
 
@@ -952,22 +985,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         U.doInParallel(
             cctx.kernalContext().getSystemExecutorService(),
             startedGroups,
-            new IgniteInClosureX<CacheGroupDescriptor>() {
-                @Override public void applyx(CacheGroupDescriptor grpDesc) throws IgniteCheckedException {
-                    if (crd)
-                        initStartedGroupOnCoordinator(fut, grpDesc);
-                    else {
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId());
+            grpDesc -> {
+                if (crd)
+                    initStartedGroupOnCoordinator(fut, grpDesc);
+                else {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId());
 
-                        if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) {
-                            assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
+                    if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) {
+                        assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
-                            initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut);
-                        }
+                        initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut);
                     }
                 }
-            },
-            null);
+            }
+        );
     }
 
     /**
@@ -1228,7 +1259,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             .collect(Collectors.toList());
 
         try {
-            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null);
+            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to execute affinity operation on cache groups", e);
@@ -1255,7 +1286,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         try {
-            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null);
+            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to execute affinity operation on cache groups", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2e66e5b..3116d31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1340,22 +1340,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         if (msgIdx != -1) {
             Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
 
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId);
+            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
+                if (clsHandlers == null)
+                    clsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
 
-            if (cacheClsHandlers == null) {
-                cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+                if(clsHandlers[msgIdx] != null)
+                    return null;
 
-                idxClsHandlers0.put(hndId, cacheClsHandlers);
-            }
+                clsHandlers[msgIdx] = c;
+
+                return clsHandlers;
+            });
 
-            if (cacheClsHandlers[msgIdx] != null)
+            if (cacheClsHandlers == null)
                 throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId +
                     ", type=" + type + ']');
 
-            cacheClsHandlers[msgIdx] = c;
-
-            msgHandlers.idxClsHandlers = idxClsHandlers0;
-
             return;
         }
         else {
@@ -1572,7 +1572,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     static class MessageHandlers {
         /** Indexed class handlers. */
-        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<>();
 
         /** Handler registry. */
         ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8b8efa3..b0e0d0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -109,6 +109,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2622,7 +2623,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     err = e;
             }
             catch (Throwable e) {
-                err = e;
+                if (!(stop && X.hasCause(e, IgniteInterruptedCheckedException.class)))
+                    err = e;
             }
             finally {
                 if (err == null && !stop && !reconnectNeeded)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4a6bed4..59703c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import javax.management.MBeanServer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.binary.BinaryContext;
@@ -136,15 +138,16 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.InitializationProtector;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainClosure;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -193,6 +196,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_C
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
+import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
 
 /**
  * Cache processor.
@@ -214,6 +218,10 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
     private final boolean walFsyncWithDedicatedWorker =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false);
 
+    /** Enables start caches in parallel. */
+    private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
+
     /** Shared cache context. */
     private GridCacheSharedContext<?, ?> sharedCtx;
 
@@ -266,6 +274,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
     /** MBean group for cache group metrics */
     private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
 
+    /** Protector of initialization of specific value. */
+    private final InitializationProtector initializationProtector = new InitializationProtector();
+
     /**
      * @param ctx Kernal context.
      */
@@ -1285,71 +1296,6 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
     }
 
     /**
-     * @param cache Cache to start.
-     * @param schema Cache schema.
-     * @throws IgniteCheckedException If failed to start cache.
-     */
-    @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
-    private void startCache(GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException {
-        GridCacheContext<?, ?> cacheCtx = cache.context();
-
-        CacheConfiguration cfg = cacheCtx.config();
-
-        // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
-        if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
-            && !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
-            U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
-                "BinaryMarshaller is not used");
-
-        // Start managers.
-        for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
-            mgr.start(cacheCtx);
-
-        cacheCtx.initConflictResolver();
-
-        if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
-            GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
-
-            // Start DHT managers.
-            for (GridCacheManager mgr : dhtManagers(dhtCtx))
-                mgr.start(dhtCtx);
-
-            dhtCtx.initConflictResolver();
-
-            // Start DHT cache.
-            dhtCtx.cache().start();
-
-            if (log.isDebugEnabled())
-                log.debug("Started DHT cache: " + dhtCtx.cache().name());
-        }
-
-        ctx.continuous().onCacheStart(cacheCtx);
-
-        cacheCtx.cache().start();
-
-        ctx.query().onCacheStart(cacheCtx, schema);
-
-        cacheCtx.onStarted();
-
-        String memPlcName = cfg.getDataRegionName();
-
-        if (memPlcName == null && ctx.config().getDataStorageConfiguration() != null)
-            memPlcName = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
-
-        if (log.isInfoEnabled()) {
-            log.info("Started cache [name=" + cfg.getName() +
-                ", id=" + cacheCtx.cacheId() +
-                (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
-                ", memoryPolicyName=" + memPlcName +
-                ", mode=" + cfg.getCacheMode() +
-                ", atomicity=" + cfg.getAtomicityMode() +
-                ", backups=" + cfg.getBackups() +
-                ", mvcc=" + cacheCtx.mvccEnabled() +']' +
-                ", encryptionEnabled=" + cfg.isEncryptionEnabled() +']');
-        }
-    }
-
-    /**
      * @param cache Cache to stop.
      * @param cancel Cancel flag.
      * @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data.
@@ -1600,7 +1546,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
         GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
         CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class);
 
-        storeMgr.initialize(cfgStore, sesHolders);
+        if (cfgStore == null)
+            storeMgr.initialize(cfgStore, sesHolders);
+        else
+            initializationProtector.protect(
+                cfgStore,
+                () -> storeMgr.initialize(cfgStore, sesHolders)
+            );
 
         GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
             ctx,
@@ -2017,18 +1969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
         IgniteInternalFuture<?> res = sharedCtx.affinity().initCachesOnLocalJoin(
             locJoinCtx.cacheGroupDescriptors(), locJoinCtx.cacheDescriptors());
 
-        for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : locJoinCtx.caches()) {
-            DynamicCacheDescriptor desc = t.get1();
+        List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream()
+            .map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false))
+            .collect(Collectors.toList());
 
-            prepareCacheStart(
-                desc.cacheConfiguration(),
-                desc,
-                t.get2(),
-                exchTopVer,
-                false);
+        prepareStartCaches(startCacheInfos);
 
-            context().exchange().exchangerUpdateHeartbeat();
-        }
+        context().exchange().exchangerUpdateHeartbeat();
 
         if (log.isInfoEnabled())
             log.info("Starting caches on local join performed in " + (System.currentTimeMillis() - time) + " ms.");
@@ -2054,24 +2001,164 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
      */
     public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
         throws IgniteCheckedException {
-        List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
+        List<DynamicCacheDescriptor> receivedCaches = cachesInfo.cachesReceivedFromJoin(nodeId);
 
-        for (DynamicCacheDescriptor desc : started) {
-            IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
+        List<StartCacheInfo> startCacheInfos = receivedCaches.stream()
+            .filter(desc -> isLocalAffinity(desc.groupDescriptor().config()))
+            .map(desc -> new StartCacheInfo(desc, null, exchTopVer, false))
+            .collect(Collectors.toList());
 
-            if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
-                prepareCacheStart(
-                    desc.cacheConfiguration(),
-                    desc,
-                    null,
-                    exchTopVer,
-                    false);
+        prepareStartCaches(startCacheInfos);
+
+        return receivedCaches;
+    }
+
+    /**
+     * @param cacheConfiguration Checked configuration.
+     * @return {@code true} if local node is affinity node for cache.
+     */
+    private boolean isLocalAffinity(CacheConfiguration cacheConfiguration) {
+        return CU.affinityNode(ctx.discovery().localNode(), cacheConfiguration.getNodeFilter());
+    }
+
+    /**
+     * Start all input caches in parallel.
+     *
+     * @param startCacheInfos All caches information for start.
+     */
+    void prepareStartCaches(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
+        prepareStartCaches(startCacheInfos, (data, operation) -> {
+            operation.accept(data);// PROXY
+        });
+    }
+
+    /**
+     * Trying to start all input caches in parallel and skip failed caches.
+     *
+     * @param startCacheInfos Caches info for start.
+     * @return Caches which was failed.
+     * @throws IgniteCheckedException if failed.
+     */
+    Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
+        HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>();
+
+        prepareStartCaches(startCacheInfos, (data, operation) -> {
+            try {
+                operation.accept(data);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw e;
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Cache can not be started : cache=" + data.getStartedConfiguration().getName());
+
+                failedCaches.put(data, e);
+            }
+        });
+
+        return failedCaches;
+    }
+
+    /**
+     * Start all input caches in parallel.
+     *
+     * @param startCacheInfos All caches information for start.
+     * @param cacheStartFailHandler Fail handler for one cache start.
+     */
+    private void prepareStartCaches(
+        Collection<StartCacheInfo> startCacheInfos,
+        StartCacheFailHandler<StartCacheInfo> cacheStartFailHandler
+    ) throws IgniteCheckedException {
+        if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) {
+            for (StartCacheInfo startCacheInfo : startCacheInfos) {
+                cacheStartFailHandler.handle(
+                    startCacheInfo,
+                    cacheInfo -> prepareCacheStart(
+                        cacheInfo.getCacheDescriptor().cacheConfiguration(),
+                        cacheInfo.getCacheDescriptor(),
+                        cacheInfo.getReqNearCfg(),
+                        cacheInfo.getExchangeTopVer(),
+                        cacheInfo.isDisabledAfterStart()
+                    )
+                );
 
                 context().exchange().exchangerUpdateHeartbeat();
             }
         }
+        else {
+            Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>();
+
+            int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize();
+
+            // Reserve at least 2 threads for system operations.
+            parallelismLvl = Math.max(1, parallelismLvl - 2);
+
+            doInParallel(
+                parallelismLvl,
+                sharedCtx.kernalContext().getSystemExecutorService(),
+                startCacheInfos,
+                startCacheInfo ->
+                    cacheStartFailHandler.handle(
+                        startCacheInfo,
+                        cacheInfo -> {
+                            GridCacheContext cacheCtx = prepareCacheContext(
+                                cacheInfo.getCacheDescriptor().cacheConfiguration(),
+                                cacheInfo.getCacheDescriptor(),
+                                cacheInfo.getReqNearCfg(),
+                                cacheInfo.getExchangeTopVer(),
+                                cacheInfo.isDisabledAfterStart()
+                            );
+                            cacheContexts.put(cacheInfo, cacheCtx);
+
+                            context().exchange().exchangerUpdateHeartbeat();
+                        }
+                    )
+            );
 
-        return started;
+            /*
+             * This hack required because we can't start sql schema in parallel by folowing reasons:
+             * * checking index to duplicate(and other checking) require one order on every nodes.
+             * * onCacheStart and createSchema contains a lot of mutex.
+             *
+             * TODO IGNITE-9729
+             */
+            Set<StartCacheInfo> successfullyPreparedCaches = cacheContexts.keySet();
+
+            List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream()
+                .filter(successfullyPreparedCaches::contains)
+                .collect(Collectors.toList());
+
+            for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) {
+                cacheStartFailHandler.handle(
+                    startCacheInfo,
+                    cacheInfo -> {
+                        ctx.query().onCacheStart(
+                            cacheContexts.get(cacheInfo),
+                            cacheInfo.getCacheDescriptor().schema() != null
+                                ? cacheInfo.getCacheDescriptor().schema()
+                                : new QuerySchema()
+                        );
+
+                        context().exchange().exchangerUpdateHeartbeat();
+                    }
+                );
+            }
+
+            doInParallel(
+                parallelismLvl,
+                sharedCtx.kernalContext().getSystemExecutorService(),
+                cacheContexts.entrySet(),
+                cacheCtxEntry ->
+                    cacheStartFailHandler.handle(
+                        cacheCtxEntry.getKey(),
+                        cacheInfo -> {
+                            onCacheStarted(cacheCtxEntry.getValue());
+
+                            context().exchange().exchangerUpdateHeartbeat();
+                        }
+                    )
+            );
+        }
     }
 
     /**
@@ -2090,71 +2177,156 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
         AffinityTopologyVersion exchTopVer,
         boolean disabledAfterStart
     ) throws IgniteCheckedException {
+        GridCacheContext cacheCtx = prepareCacheContext(startCfg, desc, reqNearCfg, exchTopVer, disabledAfterStart);
+
+        ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema());
+
+        onCacheStarted(cacheCtx);
+    }
+
+    /**
+     * Preparing cache context to start.
+     *
+     * @param startCfg Cache configuration to use.
+     * @param desc Cache descriptor.
+     * @param reqNearCfg Near configuration if specified for client cache start request.
+     * @param exchTopVer Current exchange version.
+     * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change
+     *  state of proxies to restarting
+     * @return Created {@link GridCacheContext}.
+     * @throws IgniteCheckedException if failed.
+     */
+    private GridCacheContext prepareCacheContext(
+        CacheConfiguration startCfg,
+        DynamicCacheDescriptor desc,
+        @Nullable NearCacheConfiguration reqNearCfg,
+        AffinityTopologyVersion exchTopVer,
+        boolean disabledAfterStart
+    ) throws IgniteCheckedException {
         assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
         CacheConfiguration ccfg = new CacheConfiguration(startCfg);
 
         CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-        boolean affNode;
+        boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg);
 
-        if (ccfg.getCacheMode() == LOCAL) {
-            affNode = true;
+        preparePageStore(desc, affNode);
+
+        CacheGroupContext grp = prepareCacheGroup(desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName());
+
+        GridCacheContext cacheCtx = createCache(ccfg,
+            grp,
+            null,
+            desc,
+            exchTopVer,
+            cacheObjCtx,
+            affNode,
+            true,
+            disabledAfterStart
+        );
 
+        initCacheContext(cacheCtx, ccfg, desc.deploymentId());
+
+        return cacheCtx;
+    }
+
+    /**
+     * Check for affinity node and customize near configuration if needed.
+     *
+     * @param desc Cache descriptor.
+     * @param reqNearCfg Near configuration if specified for client cache start request.
+     * @param ccfg Cache configuration to use.
+     * @return {@code true} if it is affinity node for cache.
+     */
+    private boolean checkForAffinityNode(
+        DynamicCacheDescriptor desc,
+        @Nullable NearCacheConfiguration reqNearCfg,
+        CacheConfiguration ccfg
+    ) {
+        if (ccfg.getCacheMode() == LOCAL) {
             ccfg.setNearConfiguration(null);
-        }
-        else if (CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter()))
-            affNode = true;
-        else {
-            affNode = false;
 
-            ccfg.setNearConfiguration(reqNearCfg);
+            return true;
         }
 
-        if (sharedCtx.pageStore() != null && affNode)
-            sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData());
-
-        String grpName = startCfg.getGroupName();
+        if (isLocalAffinity(desc.groupDescriptor().config()))
+            return true;
 
-        CacheGroupContext grp = null;
+        ccfg.setNearConfiguration(reqNearCfg);
 
-        if (grpName != null) {
-            for (CacheGroupContext grp0 : cacheGrps.values()) {
-                if (grp0.sharedGroup() && grpName.equals(grp0.name())) {
-                    grp = grp0;
+        return false;
+    }
 
-                    break;
-                }
-            }
+    /**
+     * Prepare page store for start cache.
+     *
+     * @param desc Cache descriptor.
+     * @param affNode {@code true} if it is affinity node for cache.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void preparePageStore(DynamicCacheDescriptor desc, boolean affNode) throws IgniteCheckedException {
+        if (sharedCtx.pageStore() != null && affNode)
+            initializationProtector.protect(
+                desc.groupDescriptor().groupId(),
+                () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData())
+            );
+    }
 
-            if (grp == null) {
-                grp = startCacheGroup(desc.groupDescriptor(),
+    /**
+     * Prepare cache group to start cache.
+     *
+     * @param desc Cache descriptor.
+     * @param exchTopVer Current exchange version.
+     * @param cacheObjCtx Cache object context.
+     * @param affNode {@code true} if it is affinity node for cache.
+     * @param grpName Group name.
+     * @return Prepared cache group context.
+     * @throws IgniteCheckedException if failed.
+     */
+    private CacheGroupContext prepareCacheGroup(
+        DynamicCacheDescriptor desc,
+        AffinityTopologyVersion exchTopVer,
+        CacheObjectContext cacheObjCtx,
+        boolean affNode,
+        String grpName
+    ) throws IgniteCheckedException {
+        if (grpName != null) {
+            return initializationProtector.protect(
+                desc.groupId(),
+                () -> findCacheGroup(grpName),
+                () -> startCacheGroup(
+                    desc.groupDescriptor(),
                     desc.cacheType(),
                     affNode,
                     cacheObjCtx,
-                    exchTopVer);
-            }
-        }
-        else {
-            grp = startCacheGroup(desc.groupDescriptor(),
-                desc.cacheType(),
-                affNode,
-                cacheObjCtx,
-                exchTopVer);
+                    exchTopVer
+                )
+            );
         }
 
-        GridCacheContext cacheCtx = createCache(ccfg,
-            grp,
-            null,
-            desc,
-            exchTopVer,
-            cacheObjCtx,
+        return startCacheGroup(desc.groupDescriptor(),
+            desc.cacheType(),
             affNode,
-            true,
-            disabledAfterStart
+            cacheObjCtx,
+            exchTopVer
         );
+    }
 
-        cacheCtx.dynamicDeploymentId(desc.deploymentId());
+    /**
+     * Initialize created cache context.
+     *
+     * @param cacheCtx Cache context to initializtion.
+     * @param cfg Cache configuration.
+     * @param deploymentId Dynamic deployment ID.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void initCacheContext(
+        GridCacheContext<?, ?> cacheCtx,
+        CacheConfiguration cfg,
+        IgniteUuid deploymentId
+    ) throws IgniteCheckedException {
+        cacheCtx.dynamicDeploymentId(deploymentId);
 
         GridCacheAdapter cache = cacheCtx.cache();
 
@@ -2162,7 +2334,67 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
 
         caches.put(cacheCtx.name(), cache);
 
-        startCache(cache, desc.schema() != null ? desc.schema() : new QuerySchema());
+        // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
+        if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
+            && !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
+            U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
+                "BinaryMarshaller is not used");
+
+        // Start managers.
+        for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
+            mgr.start(cacheCtx);
+
+        cacheCtx.initConflictResolver();
+
+        if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
+            GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
+
+            // Start DHT managers.
+            for (GridCacheManager mgr : dhtManagers(dhtCtx))
+                mgr.start(dhtCtx);
+
+            dhtCtx.initConflictResolver();
+
+            // Start DHT cache.
+            dhtCtx.cache().start();
+
+            if (log.isDebugEnabled())
+                log.debug("Started DHT cache: " + dhtCtx.cache().name());
+        }
+
+        ctx.continuous().onCacheStart(cacheCtx);
+
+        cacheCtx.cache().start();
+    }
+
+    /**
+     * Handle of cache context which was fully prepared.
+     *
+     * @param cacheCtx Fully prepared context.
+     * @throws IgniteCheckedException if failed.
+     */
+    private void onCacheStarted(GridCacheContext cacheCtx) throws IgniteCheckedException {
+        GridCacheAdapter cache = cacheCtx.cache();
+        CacheConfiguration cfg = cacheCtx.config();
+        CacheGroupContext grp = cacheGrps.get(cacheCtx.groupId());
+
+        cacheCtx.onStarted();
+
+        String dataRegion = cfg.getDataRegionName();
+
+        if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
+            dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
+
+        if (log.isInfoEnabled()) {
+            log.info("Started cache [name=" + cfg.getName() +
+                ", id=" + cacheCtx.cacheId() +
+                (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
+                ", dataRegionName=" + dataRegion +
+                ", mode=" + cfg.getCacheMode() +
+                ", atomicity=" + cfg.getAtomicityMode() +
+                ", backups=" + cfg.getBackups() +
+                ", mvcc=" + cacheCtx.mvccEnabled() + ']');
+        }
 
         grp.onCacheStarted(cacheCtx);
 
@@ -2170,6 +2402,17 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
     }
 
     /**
+     * @param grpName Group name.
+     * @return Found group or null.
+     */
+    private CacheGroupContext findCacheGroup(String grpName) {
+        return cacheGrps.values().stream()
+            .filter(grp -> grp.sharedGroup() && grpName.equals(grp.name()))
+            .findAny()
+            .orElse(null);
+    }
+
+    /**
      * Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be
      * called concurrently with another caches restart.
      */
@@ -4839,7 +5082,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
 
                     // Check if we were asked to start a near cache.
                     if (nearCfg != null) {
-                        if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) {
+                        if (isLocalAffinity(descCfg)) {
                             // If we are on a data node and near cache was enabled, return success, else - fail.
                             if (descCfg.getNearConfiguration() != null)
                                 return null;
@@ -4851,7 +5094,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
                             // If local node has near cache, return success.
                             req.clientStartOnly(true);
                     }
-                    else if (!CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter()))
+                    else if (!isLocalAffinity(descCfg))
                         req.clientStartOnly(true);
 
                     req.deploymentId(desc.deploymentId());
@@ -5020,6 +5263,22 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
     }
 
     /**
+     * Handle of fail during cache start.
+     *
+     * @param <T> Type of started data.
+     */
+    private static interface StartCacheFailHandler<T> {
+        /**
+         * Handle of fail.
+         *
+         * @param data Start data.
+         * @param startCacheOperation Operation for start cache.
+         * @throws IgniteCheckedException if failed.
+         */
+        void handle(T data, IgniteThrowableConsumer<T> startCacheOperation) throws IgniteCheckedException;
+    }
+
+    /**
      *
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
new file mode 100644
index 0000000..a5aea26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Specific cache information for start.
+ */
+public class StartCacheInfo {
+    /** Cache configuration for start. */
+    private final CacheConfiguration startedConf;
+
+    /** Cache descriptor for start. */
+    private final DynamicCacheDescriptor desc;
+
+    /** Near cache configuration for start. */
+    private final @Nullable NearCacheConfiguration reqNearCfg;
+
+    /** Exchange topology version in which starting happened. */
+    private final AffinityTopologyVersion exchTopVer;
+
+    /** Disable started cache after start or not. */
+    private final boolean disabledAfterStart;
+
+    /**
+     * @param desc Cache configuration for start.
+     * @param reqNearCfg Near cache configuration for start.
+     * @param exchTopVer Exchange topology version in which starting happened.
+     * @param disabledAfterStart Disable started cache after start or not.
+     */
+    public StartCacheInfo(DynamicCacheDescriptor desc,
+        NearCacheConfiguration reqNearCfg,
+        AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) {
+        this(desc.cacheConfiguration(), desc, reqNearCfg, exchTopVer, disabledAfterStart);
+    }
+
+    /**
+     * @param conf Cache configuration for start.
+     * @param desc Cache descriptor for start.
+     * @param reqNearCfg Near cache configuration for start.
+     * @param exchTopVer Exchange topology version in which starting happened.
+     * @param disabledAfterStart Disable started cache after start or not.
+     */
+    public StartCacheInfo(CacheConfiguration conf, DynamicCacheDescriptor desc,
+        NearCacheConfiguration reqNearCfg,
+        AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) {
+        startedConf = conf;
+        this.desc = desc;
+        this.reqNearCfg = reqNearCfg;
+        this.exchTopVer = exchTopVer;
+        this.disabledAfterStart = disabledAfterStart;
+    }
+
+    /**
+     * @return Cache configuration for start.
+     */
+    public CacheConfiguration getStartedConfiguration() {
+        return startedConf;
+    }
+
+    /**
+     * @return Cache descriptor for start.
+     */
+    public DynamicCacheDescriptor getCacheDescriptor() {
+        return desc;
+    }
+
+    /**
+     * @return Near cache configuration for start.
+     */
+    @Nullable public NearCacheConfiguration getReqNearCfg() {
+        return reqNearCfg;
+    }
+
+    /**
+     * @return Exchange topology version in which starting happened.
+     */
+    public AffinityTopologyVersion getExchangeTopVer() {
+        return exchTopVer;
+    }
+
+    /**
+     * @return Disable started cache after start or not.
+     */
+    public boolean isDisabledAfterStart() {
+        return disabledAfterStart;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartCacheInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d494857..0c2cbe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3512,36 +3512,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
                 nonLocalCacheGroupDescriptors(),
-                new IgniteInClosureX<CacheGroupDescriptor>() {
-                    @Override public void applyx(CacheGroupDescriptor grpDesc) {
-                        CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
-
-                        GridDhtPartitionTopology top = grpCtx != null
-                            ? grpCtx.topology()
-                            : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
-
-                        // Do not validate read or write through caches or caches with disabled rebalance
-                        // or ExpiryPolicy is set or validation is disabled.
-                        if (grpCtx == null
-                            || grpCtx.config().isReadThrough()
-                            || grpCtx.config().isWriteThrough()
-                            || grpCtx.config().getCacheStoreFactory() != null
-                            || grpCtx.config().getRebalanceDelay() == -1
-                            || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
-                            || grpCtx.config().getExpiryPolicyFactory() == null
-                            || SKIP_PARTITION_SIZE_VALIDATION)
-                            return;
+                grpDesc -> {
+                    CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
+
+                    GridDhtPartitionTopology top = grpCtx != null
+                        ? grpCtx.topology()
+                        : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
+
+                    // Do not validate read or write through caches or caches with disabled rebalance
+                    // or ExpiryPolicy is set or validation is disabled.
+                    if (grpCtx == null
+                        || grpCtx.config().isReadThrough()
+                        || grpCtx.config().isWriteThrough()
+                        || grpCtx.config().getCacheStoreFactory() != null
+                        || grpCtx.config().getRebalanceDelay() == -1
+                        || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
+                        || grpCtx.config().getExpiryPolicyFactory() == null
+                        || SKIP_PARTITION_SIZE_VALIDATION)
+                        return;
 
-                        try {
-                            validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs);
-                        }
-                        catch (IgniteCheckedException ex) {
-                            log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
-                            // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
-                        }
+                    try {
+                        validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs);
                     }
-                },
-                null);
+                    catch (IgniteCheckedException ex) {
+                        log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
+                        // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
+                    }
+                }
+            );
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to validate partitions state", e);
@@ -3561,21 +3559,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
                 nonLocalCacheGroupDescriptors(),
-                new IgniteInClosureX<CacheGroupDescriptor>() {
-                    @Override public void applyx(CacheGroupDescriptor grpDesc) {
-                        CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
+                grpDesc -> {
+                    CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
 
-                        GridDhtPartitionTopology top = grpCtx != null
-                            ? grpCtx.topology()
-                            : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
+                    GridDhtPartitionTopology top = grpCtx != null
+                        ? grpCtx.topology()
+                        : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
 
-                        if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
-                            assignPartitionSizes(top);
-                        else
-                            assignPartitionStates(top);
-                    }
-                },
-                null);
+                    if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+                        assignPartitionSizes(top);
+                    else
+                        assignPartitionStates(top);
+                }
+            );
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to assign partition states", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e6f374a..2fe0eb8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -142,6 +142,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 import java.util.zip.ZipInputStream;
@@ -216,19 +217,16 @@ import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
@@ -236,6 +234,7 @@ import org.apache.ignite.lang.IgniteFutureTimeoutException;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.marshaller.Marshaller;
@@ -10739,54 +10738,91 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Execute operation on data in parallel.
+     *
      * @param executorSvc Service for parallel execution.
      * @param srcDatas List of data for parallelization.
-     * @param consumer Logic for execution of on each item of data.
-     * @param errHnd Optionan error handler. If not {@code null}, an error of each item execution will be passed to
-     *      this handler. If error handler is not {@code null}, the exception will not be thrown from this method.
+     * @param operation Logic for execution of on each item of data.
      * @param <T> Type of data.
-     * @return List of (item, execution future) tuples.
-     * @throws IgniteCheckedException If parallel execution failed and {@code errHnd} is {@code null}.
+     * @throws IgniteCheckedException if parallel execution was failed.
      */
-    public static <T> List<T2<T, Future<Object>>> doInParallel(
+    public static <T> void doInParallel(ExecutorService executorSvc, Collection<T> srcDatas,
+        IgniteThrowableConsumer<T> operation) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        doInParallel(srcDatas.size(), executorSvc, srcDatas, operation);
+    }
+
+    /**
+     * Execute operation on data in parallel.
+     *
+     * @param parallelismLvl Number of threads on which it should be executed.
+     * @param executorSvc Service for parallel execution.
+     * @param srcDatas List of data for parallelization.
+     * @param operation Logic for execution of on each item of data.
+     * @param <T> Type of data.
+     * @throws IgniteCheckedException if parallel execution was failed.
+     */
+    public static <T> void doInParallel(
+        int parallelismLvl,
         ExecutorService executorSvc,
         Collection<T> srcDatas,
-        IgniteInClosureX<T> consumer,
-        @Nullable IgniteBiInClosure<T, Throwable> errHnd
-    ) throws IgniteCheckedException {
-        List<T2<T, Future<Object>>> consumerFutures = srcDatas.stream()
-            .map(item -> new T2<>(
-                item,
-                executorSvc.submit(() -> {
-                    consumer.apply(item);
+        IgniteThrowableConsumer<T> operation
+    ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        List<List<T>> batches = IntStream.range(0, parallelismLvl)
+            .mapToObj(i -> new ArrayList<T>())
+            .collect(Collectors.toList());
 
-                    return null;
-                })))
+        int i = 0;
+
+        for (T src : srcDatas)
+            batches.get(i++ % parallelismLvl).add(src);
+
+        List<Future<Object>> consumerFutures = batches.stream()
+            .filter(batch -> !batch.isEmpty())
+            .map(batch -> executorSvc.submit(() -> {
+                for (T item : batch)
+                    operation.accept(item);
+
+                return null;
+            }))
             .collect(Collectors.toList());
 
-        IgniteCheckedException composite = null;
+        Throwable error =null;
 
-        for (T2<T, Future<Object>> tup : consumerFutures) {
+        for (Future<Object> future : consumerFutures) {
             try {
-                getUninterruptibly(tup.get2());
+                future.get();
             }
-            catch (ExecutionException e) {
-                if (errHnd != null)
-                    errHnd.apply(tup.get1(), e.getCause());
-                else {
-                    if (composite == null)
-                        composite = new IgniteCheckedException("Failed to execute one of the tasks " +
-                            "(see suppressed exception for details)");
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
 
-                    composite.addSuppressed(e.getCause());
-                }
+                throw new IgniteInterruptedCheckedException(e);
+            }
+            catch (ExecutionException e) {
+                if(error == null)
+                    error = e.getCause();
+                else
+                    error.addSuppressed(e.getCause());
+            }
+            catch (CancellationException e) {
+                if(error == null)
+                    error = e;
+                else
+                    error.addSuppressed(e);
             }
         }
 
-        if (composite != null)
-            throw composite;
+        if (error != null) {
+            if (error instanceof IgniteCheckedException)
+                throw (IgniteCheckedException)error;
 
-        return consumerFutures;
+            if (error instanceof RuntimeException)
+                throw (RuntimeException)error;
+
+            if (error instanceof Error)
+                throw (Error)error;
+
+            throw new IgniteCheckedException(error);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
new file mode 100644
index 0000000..7c501c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+
+/**
+ * Class for avoid multiple initialization of specific value from various threads.
+ */
+public class InitializationProtector {
+    /** Default striped lock concurrency level. */
+    private static final int DEFAULT_CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors();
+
+    /** Striped lock. */
+    private GridStripedLock stripedLock = new GridStripedLock(DEFAULT_CONCURRENCY_LEVEL);
+
+    /**
+     * @param protectedKey Unique value by which initialization code should be run only one time.
+     * @param initializedVal Supplier for given already initialized value if it exist or null as sign that
+     * initialization required.
+     * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent.
+     * @param <T> Type of initialization value.
+     * @return Initialized value.
+     * @throws IgniteCheckedException if initialization was failed.
+     */
+    public <T> T protect(Object protectedKey, Supplier<T> initializedVal,
+        IgniteThrowableRunner initializationCode) throws IgniteCheckedException {
+        T value = initializedVal.get();
+
+        if (value != null)
+            return value;
+
+        Lock lock = stripedLock.getLock(protectedKey.hashCode() % stripedLock.concurrencyLevel());
+
+        lock.lock();
+        try {
+            value = initializedVal.get();
+
+            if (value != null)
+                return value;
+
+            initializationCode.run();
+
+            return initializedVal.get();
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * It method allows to avoid simultaneous initialization from various threads.
+     *
+     * @param protectedKey Unique value by which initialization code should be run only from one thread in one time.
+     * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent.
+     * @throws IgniteCheckedException if initialization was failed.
+     */
+    public void protect(Object protectedKey, IgniteThrowableRunner initializationCode) throws IgniteCheckedException {
+        protect(protectedKey, () -> null, initializationCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
new file mode 100644
index 0000000..46813a9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional
+ * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects.
+ *
+ * @param <E> Type of closure parameter.
+ */
+public interface IgniteThrowableConsumer<E> extends Serializable {
+    /**
+     * Consumer body.
+     *
+     * @param e Consumer parameter.
+     * @throws IgniteCheckedException if body execution was failed.
+     */
+    public void accept(E e) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
new file mode 100644
index 0000000..a5c95e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Represents a throwable runner.
+ */
+public interface IgniteThrowableRunner {
+    /**
+     * Execute a body.
+     */
+    void run() throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
new file mode 100644
index 0000000..4e30d1c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheStartInParallelTest extends GridCommonAbstractTest {
+    /** */
+    private static final int CACHES_COUNT = 40;
+
+    /** */
+    private static final String STATIC_CACHE_PREFIX = "static-cache-";
+
+    /** */
+    private static final String DYNAMIC_CACHE_PREFIX = "dynamic-cache-";
+
+    /** */
+    private static boolean isStaticCache = true;
+
+    /** */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setSystemThreadPoolSize(10);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        long sz = 100 * 1024 * 1024;
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(sz).setMaxSize(sz))
+            .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        if (isStaticCache) {
+            ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT);
+
+            for (int i = 0; i < CACHES_COUNT; i++)
+                staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i));
+
+            cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT]));
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setName(cacheName);
+        cfg.setBackups(1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanupTestData();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        cleanupTestData();
+    }
+
+    /** */
+    private void cleanupTestData() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL);
+
+        isStaticCache = true;
+    }
+
+    /**
+     * Checking that start static caches in parallel faster than consistenly.
+     *
+     * @throws Exception if fail.
+     */
+    public void testParallelizationAcceleratesStartOfStaticCaches() throws Exception {
+        //start caches consistently.
+        System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false");
+
+        long startTime = System.currentTimeMillis();
+
+        IgniteEx igniteEx = startGrid(0);
+
+        igniteEx.cluster().active(true);
+
+        long totalStartTimeConsistently = System.currentTimeMillis() - startTime;
+
+        //check cache started.
+        for (int i = 0; i < CACHES_COUNT; i++)
+            igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i);
+
+        stopAllGrids();
+
+        //start caches in parallel.
+        System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true");
+
+        startTime = System.currentTimeMillis();
+
+        igniteEx = startGrid(0);
+
+        igniteEx.cluster().active(true);
+
+        long totalStartTimeInParallel = System.currentTimeMillis() - startTime;
+
+        for (int i = 0; i < CACHES_COUNT; i++)
+            igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i);
+
+        stopAllGrids();
+
+        assertTrue("Consistently cache stat time : " + totalStartTimeConsistently +
+                "Parallelization cache stat time : " + totalStartTimeInParallel,
+            totalStartTimeConsistently > totalStartTimeInParallel);
+    }
+
+    /**
+     * Checking that start dynamic caches in parallel faster than consistenly.
+     *
+     * @throws Exception if fail.
+     */
+    public void testParallelizationAcceleratesStartOfCaches2() throws Exception {
+        //prepare dynamic caches.
+        isStaticCache = false;
+
+        IgniteEx igniteEx = startGrid(0);
+
+        igniteEx.cluster().active(true);
+
+        for (int i = 0; i < CACHES_COUNT; i++)
+            igniteEx.getOrCreateCache(DYNAMIC_CACHE_PREFIX + i);
+
+        stopAllGrids();
+
+        //start caches consistently.
+        System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false");
+
+        igniteEx = startGrid(0);
+        long startTime = System.currentTimeMillis();
+
+        igniteEx.cluster().active(true);
+
+        long totalStartTimeConsistently = System.currentTimeMillis() - startTime;
+
+        for (int i = 0; i < CACHES_COUNT; i++)
+            igniteEx.cache(DYNAMIC_CACHE_PREFIX + i);
+
+        stopAllGrids();
+
+        //start caches in parallel.
+        System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true");
+
+        startTime = System.currentTimeMillis();
+
+        igniteEx = startGrid(0);
+
+        igniteEx.cluster().active(true);
+
+        long totalStartTimeInParallel = System.currentTimeMillis() - startTime;
+
+        for (int i = 0; i < CACHES_COUNT; i++)
+            igniteEx.cache(DYNAMIC_CACHE_PREFIX + i).put(i, i);
+
+        stopAllGrids();
+
+        assertTrue("Consistently cache stat time : " + totalStartTimeConsistently +
+                "Parallelization cache stat time : " + totalStartTimeInParallel,
+            totalStartTimeConsistently > totalStartTimeInParallel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 870ce67..66453b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import javax.cache.Cache;
-import javax.cache.configuration.Factory;
-import javax.cache.integration.CacheLoaderException;
-import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.store.CacheStore;
@@ -61,7 +61,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
         CacheConfiguration cfg3 = cacheConfiguration("cacheC", new SecondStoreFactory());
         CacheConfiguration cfg4 = cacheConfiguration("cacheD", null);
 
-        cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4);
+        cfg.setCacheConfiguration(cfg4, cfg2, cfg3, cfg1);
 
         return cfg;
     }
@@ -92,6 +92,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        cleanPersistenceDir();
+
         startGrids(4);
     }
 
@@ -160,8 +162,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
             "delete cacheA",
             "write cacheB",
             "sessionEnd true"
-        ),
-        firstStoreEvts);
+            ),
+            firstStoreEvts);
 
         assertEquals(0, secondStoreEvts.size());
     }
@@ -209,16 +211,16 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
             "write cacheA",
             "delete cacheA",
             "sessionEnd true"
-        ),
-        firstStoreEvts);
+            ),
+            firstStoreEvts);
 
         assertEqualsCollections(F.asList(
             "writeAll cacheC 2",
             "deleteAll cacheC 2",
             "write cacheC",
             "sessionEnd true"
-        ),
-        secondStoreEvts);
+            ),
+            secondStoreEvts);
     }
 
     /**
@@ -264,8 +266,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
             "write cacheA",
             "delete cacheA",
             "sessionEnd true"
-        ),
-        firstStoreEvts);
+            ),
+            firstStoreEvts);
 
         assertEquals(0, secondStoreEvts.size());
     }
@@ -361,15 +363,10 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
         private Ignite ignite;
 
         /** {@inheritDoc} */
-        @Override public CacheStore create() {
+        @Override public synchronized CacheStore create() {
             String igniteInstanceName = ignite.name();
 
-            CacheStore store = firstStores.get(igniteInstanceName);
-
-            if (store == null)
-                store = F.addIfAbsent(firstStores, igniteInstanceName, new TestStore());
-
-            return store;
+            return firstStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
         }
     }
 
@@ -384,12 +381,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
         @Override public CacheStore create() {
             String igniteInstanceName = ignite.name();
 
-            CacheStore store = secondStores.get(igniteInstanceName);
-
-            if (store == null)
-                store = F.addIfAbsent(secondStores, igniteInstanceName, new TestStore());
-
-            return store;
+            return secondStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 61a076e..13a1044 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -44,6 +44,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterGroup;
@@ -877,6 +881,76 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testDoInParallel() throws Throwable {
+        CyclicBarrier barrier = new CyclicBarrier(3);
+
+        IgniteUtils.doInParallel(3,
+            Executors.newFixedThreadPool(3),
+            Arrays.asList(1, 2, 3),
+            i -> {
+                try {
+                    barrier.await(1, TimeUnit.SECONDS);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+        );
+    }
+
+    /**
+     *
+     */
+    public void testDoInParallelBatch() {
+        CyclicBarrier barrier = new CyclicBarrier(3);
+
+        try {
+            IgniteUtils.doInParallel(2,
+                Executors.newFixedThreadPool(3),
+                Arrays.asList(1, 2, 3),
+                i -> {
+                    try {
+                        barrier.await(400, TimeUnit.MILLISECONDS);
+                    }
+                    catch (Exception e) {
+                        throw new IgniteCheckedException(e);
+                    }
+                }
+            );
+
+            fail("Should throw timeout exception");
+        }
+        catch (Exception e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
+    /**
+     *
+     */
+    public void testDoInParallelException() {
+        String expectedException = "ExpectedException";
+
+        try {
+            IgniteUtils.doInParallel(3,
+                Executors.newFixedThreadPool(1),
+                Arrays.asList(1, 2, 3),
+                i -> {
+                    if (i == 1)
+                        throw new IgniteCheckedException(expectedException);
+                }
+            );
+
+            fail("Should throw ParallelExecutionException");
+        }
+        catch (IgniteCheckedException e) {
+            assertEquals(expectedException, e.getMessage());
+        }
+    }
+
+    /**
      * Test enum.
      */
     private enum TestEnum {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index d0734a8..0381a1f 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartition
 import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheStartInParallelTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest;
@@ -99,6 +100,8 @@ public class IgniteCacheTestSuite7 extends TestSuite {
 
         suite.addTestSuite(CacheRentingStateRepairTest.class);
 
+        suite.addTestSuite(CacheStartInParallelTest.class);
+
         suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class);
         suite.addTestSuite(CacheDataLossOnPartitionMoveTest.class);