You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/10/22 13:33:25 UTC
ignite git commit: IGNITE-8006 Parallelize cache groups start - Fixes
#4752.
Repository: ignite
Updated Branches:
refs/heads/master e5a467272 -> e1f8f46f9
IGNITE-8006 Parallelize cache groups start - Fixes #4752.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1f8f46f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1f8f46f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1f8f46f
Branch: refs/heads/master
Commit: e1f8f46f90868d377bc764b74d07812150218c71
Parents: e5a4672
Author: Anton Kalashnikov <ka...@yandex.ru>
Authored: Mon Oct 22 16:27:49 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 22 16:27:49 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 7 +
.../cache/CacheAffinitySharedManager.java | 115 ++--
.../processors/cache/GridCacheIoManager.java | 22 +-
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheProcessor.java | 523 ++++++++++++++-----
.../processors/cache/StartCacheInfo.java | 113 ++++
.../GridDhtPartitionsExchangeFuture.java | 78 ++-
.../ignite/internal/util/IgniteUtils.java | 104 ++--
.../internal/util/InitializationProtector.java | 79 +++
.../util/lang/IgniteThrowableConsumer.java | 37 ++
.../util/lang/IgniteThrowableRunner.java | 30 ++
.../distributed/CacheStartInParallelTest.java | 219 ++++++++
.../IgniteCrossCacheTxStoreSelfTest.java | 44 +-
.../internal/util/IgniteUtilsSelfTest.java | 74 +++
.../testsuites/IgniteCacheTestSuite7.java | 3 +
15 files changed, 1165 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 521222c..6afe244 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1030,6 +1030,13 @@ public final class IgniteSystemProperties {
public static final String IGNITE_REBALANCE_THROTTLE_OVERRIDE = "IGNITE_REBALANCE_THROTTLE_OVERRIDE";
/**
+ * Enables start caches in parallel.
+ *
+ * Default is {@code true}.
+ */
+ public static final String IGNITE_ALLOW_START_CACHES_IN_PARALLEL = "IGNITE_ALLOW_START_CACHES_IN_PARALLEL";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index cedbde1..6e10c00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.processors.cache;
+import javax.cache.CacheException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,7 +32,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
@@ -427,25 +428,43 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
- Set<String> startedCaches = U.newHashSet(startDescs.size());
-
Map<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size());
- for (DynamicCacheDescriptor desc : startDescs) {
- try {
- startedCaches.add(desc.cacheName());
+ List<StartCacheInfo> startCacheInfos = startDescs.stream()
+ .map(desc -> {
+ DynamicCacheChangeRequest changeReq = startReqs.get(desc.cacheName());
- DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
-
- cctx.cache().prepareCacheStart(
- desc.cacheConfiguration(),
+ return new StartCacheInfo(
desc,
- startReq.nearCacheConfiguration(),
+ changeReq.nearCacheConfiguration(),
topVer,
- startReq.disabledAfterStart()
+ changeReq.disabledAfterStart()
);
+ })
+ .collect(Collectors.toList());
+
+ Set<String> startedCaches = startCacheInfos.stream()
+ .map(info -> info.getCacheDescriptor().cacheName())
+ .collect(Collectors.toSet());
+
+ try {
+ cctx.cache().prepareStartCaches(startCacheInfos);
+ }
+ catch (IgniteCheckedException e) {
+ cctx.cache().closeCaches(startedCaches, false);
+
+ cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+ return null;
+ }
+
+ for (StartCacheInfo startCacheInfo : startCacheInfos) {
+ try {
+ DynamicCacheDescriptor desc = startCacheInfo.getCacheDescriptor();
+
+ startedCaches.add(desc.cacheName());
- startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
+ startedInfos.put(desc.cacheId(), startCacheInfo.getReqNearCfg() != null);
CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
@@ -860,6 +879,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
long time = System.currentTimeMillis();
+ Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new LinkedHashMap<>();
+
for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
DynamicCacheDescriptor cacheDesc = action.descriptor();
@@ -895,29 +916,41 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
- try {
- if (startCache) {
- cctx.cache().prepareCacheStart(
+ if (startCache) {
+ startCacheInfos.put(
+ new StartCacheInfo(
req.startCacheConfiguration(),
cacheDesc,
nearCfg,
evts.topologyVersion(),
req.disabledAfterStart()
- );
-
- if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
- if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())
- U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
- }
- }
+ ),
+ req
+ );
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
- "[cacheName=" + req.cacheName() + ']', e);
+ }
+
+ Map<StartCacheInfo, IgniteCheckedException> failedCaches = cctx.cache().prepareStartCachesIfPossible(startCacheInfos.keySet());
- cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false);
+ failedCaches.forEach((cacheInfo, exception) -> {
+ U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
+ "[cacheName=" + cacheInfo.getStartedConfiguration().getName() + ']', exception);
- cctx.cache().completeCacheStartFuture(req, false, e);
+ cctx.cache().closeCaches(Collections.singleton(cacheInfo.getStartedConfiguration().getName()), false);
+
+ cctx.cache().completeCacheStartFuture(startCacheInfos.get(cacheInfo), false, exception);
+ });
+
+ Set<StartCacheInfo> failedCacheInfos = failedCaches.keySet();
+
+ List<StartCacheInfo> cacheInfos = startCacheInfos.keySet().stream()
+ .filter(failedCacheInfos::contains)
+ .collect(Collectors.toList());
+
+ for (StartCacheInfo info : cacheInfos) {
+ if (fut.cacheAddedOnExchange(info.getCacheDescriptor().cacheId(), info.getCacheDescriptor().receivedFrom())) {
+ if (fut.events().discoveryCache().cacheGroupAffinityNodes(info.getCacheDescriptor().groupId()).isEmpty())
+ U.quietAndWarn(log, "No server nodes found for cache client: " + info.getCacheDescriptor().cacheName());
}
}
@@ -952,22 +985,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
startedGroups,
- new IgniteInClosureX<CacheGroupDescriptor>() {
- @Override public void applyx(CacheGroupDescriptor grpDesc) throws IgniteCheckedException {
- if (crd)
- initStartedGroupOnCoordinator(fut, grpDesc);
- else {
- CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId());
+ grpDesc -> {
+ if (crd)
+ initStartedGroupOnCoordinator(fut, grpDesc);
+ else {
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpDesc.groupId());
- if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) {
- assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
+ if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.initialVersion())) {
+ assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
- initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut);
- }
+ initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut);
}
}
- },
- null);
+ }
+ );
}
/**
@@ -1228,7 +1259,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
.collect(Collectors.toList());
try {
- U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null);
+ U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to execute affinity operation on cache groups", e);
@@ -1255,7 +1286,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
try {
- U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c, null);
+ U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to execute affinity operation on cache groups", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2e66e5b..3116d31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1340,22 +1340,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (msgIdx != -1) {
Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
- IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId);
+ IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
+ if (clsHandlers == null)
+ clsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
- if (cacheClsHandlers == null) {
- cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+ if(clsHandlers[msgIdx] != null)
+ return null;
- idxClsHandlers0.put(hndId, cacheClsHandlers);
- }
+ clsHandlers[msgIdx] = c;
+
+ return clsHandlers;
+ });
- if (cacheClsHandlers[msgIdx] != null)
+ if (cacheClsHandlers == null)
throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId +
", type=" + type + ']');
- cacheClsHandlers[msgIdx] = c;
-
- msgHandlers.idxClsHandlers = idxClsHandlers0;
-
return;
}
else {
@@ -1572,7 +1572,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
static class MessageHandlers {
/** Indexed class handlers. */
- volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+ volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new ConcurrentHashMap<>();
/** Handler registry. */
ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8b8efa3..b0e0d0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -109,6 +109,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2622,7 +2623,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
err = e;
}
catch (Throwable e) {
- err = e;
+ if (!(stop && X.hasCause(e, IgniteInterruptedCheckedException.class)))
+ err = e;
}
finally {
if (err == null && !stop && !reconnectNeeded)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4a6bed4..59703c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import javax.management.MBeanServer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
@@ -136,15 +138,16 @@ import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
+import org.apache.ignite.internal.util.InitializationProtector;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainClosure;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -193,6 +196,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_C
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
+import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
/**
* Cache processor.
@@ -214,6 +218,10 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
private final boolean walFsyncWithDedicatedWorker =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, false);
+ /** Enables start caches in parallel. */
+ private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
+ IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
+
/** Shared cache context. */
private GridCacheSharedContext<?, ?> sharedCtx;
@@ -266,6 +274,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
/** MBean group for cache group metrics */
private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
+ /** Protector of initialization of specific value. */
+ private final InitializationProtector initializationProtector = new InitializationProtector();
+
/**
* @param ctx Kernal context.
*/
@@ -1285,71 +1296,6 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
- * @param cache Cache to start.
- * @param schema Cache schema.
- * @throws IgniteCheckedException If failed to start cache.
- */
- @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
- private void startCache(GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException {
- GridCacheContext<?, ?> cacheCtx = cache.context();
-
- CacheConfiguration cfg = cacheCtx.config();
-
- // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
- if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
- && !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
- U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
- "BinaryMarshaller is not used");
-
- // Start managers.
- for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
- mgr.start(cacheCtx);
-
- cacheCtx.initConflictResolver();
-
- if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
- GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
-
- // Start DHT managers.
- for (GridCacheManager mgr : dhtManagers(dhtCtx))
- mgr.start(dhtCtx);
-
- dhtCtx.initConflictResolver();
-
- // Start DHT cache.
- dhtCtx.cache().start();
-
- if (log.isDebugEnabled())
- log.debug("Started DHT cache: " + dhtCtx.cache().name());
- }
-
- ctx.continuous().onCacheStart(cacheCtx);
-
- cacheCtx.cache().start();
-
- ctx.query().onCacheStart(cacheCtx, schema);
-
- cacheCtx.onStarted();
-
- String memPlcName = cfg.getDataRegionName();
-
- if (memPlcName == null && ctx.config().getDataStorageConfiguration() != null)
- memPlcName = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
-
- if (log.isInfoEnabled()) {
- log.info("Started cache [name=" + cfg.getName() +
- ", id=" + cacheCtx.cacheId() +
- (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
- ", memoryPolicyName=" + memPlcName +
- ", mode=" + cfg.getCacheMode() +
- ", atomicity=" + cfg.getAtomicityMode() +
- ", backups=" + cfg.getBackups() +
- ", mvcc=" + cacheCtx.mvccEnabled() +']' +
- ", encryptionEnabled=" + cfg.isEncryptionEnabled() +']');
- }
- }
-
- /**
* @param cache Cache to stop.
* @param cancel Cancel flag.
* @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data.
@@ -1600,7 +1546,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class);
- storeMgr.initialize(cfgStore, sesHolders);
+ if (cfgStore == null)
+ storeMgr.initialize(cfgStore, sesHolders);
+ else
+ initializationProtector.protect(
+ cfgStore,
+ () -> storeMgr.initialize(cfgStore, sesHolders)
+ );
GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
ctx,
@@ -2017,18 +1969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
IgniteInternalFuture<?> res = sharedCtx.affinity().initCachesOnLocalJoin(
locJoinCtx.cacheGroupDescriptors(), locJoinCtx.cacheDescriptors());
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : locJoinCtx.caches()) {
- DynamicCacheDescriptor desc = t.get1();
+ List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream()
+ .map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false))
+ .collect(Collectors.toList());
- prepareCacheStart(
- desc.cacheConfiguration(),
- desc,
- t.get2(),
- exchTopVer,
- false);
+ prepareStartCaches(startCacheInfos);
- context().exchange().exchangerUpdateHeartbeat();
- }
+ context().exchange().exchangerUpdateHeartbeat();
if (log.isInfoEnabled())
log.info("Starting caches on local join performed in " + (System.currentTimeMillis() - time) + " ms.");
@@ -2054,24 +2001,164 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
*/
public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
- List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
+ List<DynamicCacheDescriptor> receivedCaches = cachesInfo.cachesReceivedFromJoin(nodeId);
- for (DynamicCacheDescriptor desc : started) {
- IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter();
+ List<StartCacheInfo> startCacheInfos = receivedCaches.stream()
+ .filter(desc -> isLocalAffinity(desc.groupDescriptor().config()))
+ .map(desc -> new StartCacheInfo(desc, null, exchTopVer, false))
+ .collect(Collectors.toList());
- if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
- prepareCacheStart(
- desc.cacheConfiguration(),
- desc,
- null,
- exchTopVer,
- false);
+ prepareStartCaches(startCacheInfos);
+
+ return receivedCaches;
+ }
+
+ /**
+ * @param cacheConfiguration Checked configuration.
+ * @return {@code true} if local node is affinity node for cache.
+ */
+ private boolean isLocalAffinity(CacheConfiguration cacheConfiguration) {
+ return CU.affinityNode(ctx.discovery().localNode(), cacheConfiguration.getNodeFilter());
+ }
+
+ /**
+ * Start all input caches in parallel.
+ *
+ * @param startCacheInfos All caches information for start.
+ */
+ void prepareStartCaches(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
+ prepareStartCaches(startCacheInfos, (data, operation) -> {
+ operation.accept(data);// PROXY
+ });
+ }
+
+ /**
+ * Trying to start all input caches in parallel and skip failed caches.
+ *
+ * @param startCacheInfos Caches info for start.
+ * @return Caches which was failed.
+ * @throws IgniteCheckedException if failed.
+ */
+ Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
+ HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>();
+
+ prepareStartCaches(startCacheInfos, (data, operation) -> {
+ try {
+ operation.accept(data);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Cache can not be started : cache=" + data.getStartedConfiguration().getName());
+
+ failedCaches.put(data, e);
+ }
+ });
+
+ return failedCaches;
+ }
+
+ /**
+ * Start all input caches in parallel.
+ *
+ * @param startCacheInfos All caches information for start.
+ * @param cacheStartFailHandler Fail handler for one cache start.
+ */
+ private void prepareStartCaches(
+ Collection<StartCacheInfo> startCacheInfos,
+ StartCacheFailHandler<StartCacheInfo> cacheStartFailHandler
+ ) throws IgniteCheckedException {
+ if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) {
+ for (StartCacheInfo startCacheInfo : startCacheInfos) {
+ cacheStartFailHandler.handle(
+ startCacheInfo,
+ cacheInfo -> prepareCacheStart(
+ cacheInfo.getCacheDescriptor().cacheConfiguration(),
+ cacheInfo.getCacheDescriptor(),
+ cacheInfo.getReqNearCfg(),
+ cacheInfo.getExchangeTopVer(),
+ cacheInfo.isDisabledAfterStart()
+ )
+ );
context().exchange().exchangerUpdateHeartbeat();
}
}
+ else {
+ Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>();
+
+ int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize();
+
+ // Reserve at least 2 threads for system operations.
+ parallelismLvl = Math.max(1, parallelismLvl - 2);
+
+ doInParallel(
+ parallelismLvl,
+ sharedCtx.kernalContext().getSystemExecutorService(),
+ startCacheInfos,
+ startCacheInfo ->
+ cacheStartFailHandler.handle(
+ startCacheInfo,
+ cacheInfo -> {
+ GridCacheContext cacheCtx = prepareCacheContext(
+ cacheInfo.getCacheDescriptor().cacheConfiguration(),
+ cacheInfo.getCacheDescriptor(),
+ cacheInfo.getReqNearCfg(),
+ cacheInfo.getExchangeTopVer(),
+ cacheInfo.isDisabledAfterStart()
+ );
+ cacheContexts.put(cacheInfo, cacheCtx);
+
+ context().exchange().exchangerUpdateHeartbeat();
+ }
+ )
+ );
- return started;
+ /*
+ * This hack required because we can't start sql schema in parallel by folowing reasons:
+ * * checking index to duplicate(and other checking) require one order on every nodes.
+ * * onCacheStart and createSchema contains a lot of mutex.
+ *
+ * TODO IGNITE-9729
+ */
+ Set<StartCacheInfo> successfullyPreparedCaches = cacheContexts.keySet();
+
+ List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream()
+ .filter(successfullyPreparedCaches::contains)
+ .collect(Collectors.toList());
+
+ for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) {
+ cacheStartFailHandler.handle(
+ startCacheInfo,
+ cacheInfo -> {
+ ctx.query().onCacheStart(
+ cacheContexts.get(cacheInfo),
+ cacheInfo.getCacheDescriptor().schema() != null
+ ? cacheInfo.getCacheDescriptor().schema()
+ : new QuerySchema()
+ );
+
+ context().exchange().exchangerUpdateHeartbeat();
+ }
+ );
+ }
+
+ doInParallel(
+ parallelismLvl,
+ sharedCtx.kernalContext().getSystemExecutorService(),
+ cacheContexts.entrySet(),
+ cacheCtxEntry ->
+ cacheStartFailHandler.handle(
+ cacheCtxEntry.getKey(),
+ cacheInfo -> {
+ onCacheStarted(cacheCtxEntry.getValue());
+
+ context().exchange().exchangerUpdateHeartbeat();
+ }
+ )
+ );
+ }
}
/**
@@ -2090,71 +2177,156 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart
) throws IgniteCheckedException {
+ GridCacheContext cacheCtx = prepareCacheContext(startCfg, desc, reqNearCfg, exchTopVer, disabledAfterStart);
+
+ ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema());
+
+ onCacheStarted(cacheCtx);
+ }
+
+ /**
+ * Preparing cache context to start.
+ *
+ * @param startCfg Cache configuration to use.
+ * @param desc Cache descriptor.
+ * @param reqNearCfg Near configuration if specified for client cache start request.
+ * @param exchTopVer Current exchange version.
+ * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change
+ * state of proxies to restarting
+ * @return Created {@link GridCacheContext}.
+ * @throws IgniteCheckedException if failed.
+ */
+ private GridCacheContext prepareCacheContext(
+ CacheConfiguration startCfg,
+ DynamicCacheDescriptor desc,
+ @Nullable NearCacheConfiguration reqNearCfg,
+ AffinityTopologyVersion exchTopVer,
+ boolean disabledAfterStart
+ ) throws IgniteCheckedException {
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- boolean affNode;
+ boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg);
- if (ccfg.getCacheMode() == LOCAL) {
- affNode = true;
+ preparePageStore(desc, affNode);
+
+ CacheGroupContext grp = prepareCacheGroup(desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName());
+
+ GridCacheContext cacheCtx = createCache(ccfg,
+ grp,
+ null,
+ desc,
+ exchTopVer,
+ cacheObjCtx,
+ affNode,
+ true,
+ disabledAfterStart
+ );
+ initCacheContext(cacheCtx, ccfg, desc.deploymentId());
+
+ return cacheCtx;
+ }
+
+ /**
+ * Check for affinity node and customize near configuration if needed.
+ *
+ * @param desc Cache descriptor.
+ * @param reqNearCfg Near configuration if specified for client cache start request.
+ * @param ccfg Cache configuration to use.
+ * @return {@code true} if it is affinity node for cache.
+ */
+ private boolean checkForAffinityNode(
+ DynamicCacheDescriptor desc,
+ @Nullable NearCacheConfiguration reqNearCfg,
+ CacheConfiguration ccfg
+ ) {
+ if (ccfg.getCacheMode() == LOCAL) {
ccfg.setNearConfiguration(null);
- }
- else if (CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter()))
- affNode = true;
- else {
- affNode = false;
- ccfg.setNearConfiguration(reqNearCfg);
+ return true;
}
- if (sharedCtx.pageStore() != null && affNode)
- sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData());
-
- String grpName = startCfg.getGroupName();
+ if (isLocalAffinity(desc.groupDescriptor().config()))
+ return true;
- CacheGroupContext grp = null;
+ ccfg.setNearConfiguration(reqNearCfg);
- if (grpName != null) {
- for (CacheGroupContext grp0 : cacheGrps.values()) {
- if (grp0.sharedGroup() && grpName.equals(grp0.name())) {
- grp = grp0;
+ return false;
+ }
- break;
- }
- }
+ /**
+ * Prepare page store for start cache.
+ *
+ * @param desc Cache descriptor.
+ * @param affNode {@code true} if it is affinity node for cache.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void preparePageStore(DynamicCacheDescriptor desc, boolean affNode) throws IgniteCheckedException {
+ if (sharedCtx.pageStore() != null && affNode)
+ initializationProtector.protect(
+ desc.groupDescriptor().groupId(),
+ () -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData())
+ );
+ }
- if (grp == null) {
- grp = startCacheGroup(desc.groupDescriptor(),
+ /**
+ * Prepare cache group to start cache.
+ *
+ * @param desc Cache descriptor.
+ * @param exchTopVer Current exchange version.
+ * @param cacheObjCtx Cache object context.
+ * @param affNode {@code true} if it is affinity node for cache.
+ * @param grpName Group name.
+ * @return Prepared cache group context.
+ * @throws IgniteCheckedException if failed.
+ */
+ private CacheGroupContext prepareCacheGroup(
+ DynamicCacheDescriptor desc,
+ AffinityTopologyVersion exchTopVer,
+ CacheObjectContext cacheObjCtx,
+ boolean affNode,
+ String grpName
+ ) throws IgniteCheckedException {
+ if (grpName != null) {
+ return initializationProtector.protect(
+ desc.groupId(),
+ () -> findCacheGroup(grpName),
+ () -> startCacheGroup(
+ desc.groupDescriptor(),
desc.cacheType(),
affNode,
cacheObjCtx,
- exchTopVer);
- }
- }
- else {
- grp = startCacheGroup(desc.groupDescriptor(),
- desc.cacheType(),
- affNode,
- cacheObjCtx,
- exchTopVer);
+ exchTopVer
+ )
+ );
}
- GridCacheContext cacheCtx = createCache(ccfg,
- grp,
- null,
- desc,
- exchTopVer,
- cacheObjCtx,
+ return startCacheGroup(desc.groupDescriptor(),
+ desc.cacheType(),
affNode,
- true,
- disabledAfterStart
+ cacheObjCtx,
+ exchTopVer
);
+ }
- cacheCtx.dynamicDeploymentId(desc.deploymentId());
+ /**
+ * Initialize created cache context.
+ *
+ * @param cacheCtx Cache context to initializtion.
+ * @param cfg Cache configuration.
+ * @param deploymentId Dynamic deployment ID.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void initCacheContext(
+ GridCacheContext<?, ?> cacheCtx,
+ CacheConfiguration cfg,
+ IgniteUuid deploymentId
+ ) throws IgniteCheckedException {
+ cacheCtx.dynamicDeploymentId(deploymentId);
GridCacheAdapter cache = cacheCtx.cache();
@@ -2162,7 +2334,67 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
caches.put(cacheCtx.name(), cache);
- startCache(cache, desc.schema() != null ? desc.schema() : new QuerySchema());
+ // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
+ if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
+ && !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
+ U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
+ "BinaryMarshaller is not used");
+
+ // Start managers.
+ for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
+ mgr.start(cacheCtx);
+
+ cacheCtx.initConflictResolver();
+
+ if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
+ GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
+
+ // Start DHT managers.
+ for (GridCacheManager mgr : dhtManagers(dhtCtx))
+ mgr.start(dhtCtx);
+
+ dhtCtx.initConflictResolver();
+
+ // Start DHT cache.
+ dhtCtx.cache().start();
+
+ if (log.isDebugEnabled())
+ log.debug("Started DHT cache: " + dhtCtx.cache().name());
+ }
+
+ ctx.continuous().onCacheStart(cacheCtx);
+
+ cacheCtx.cache().start();
+ }
+
+ /**
+ * Handle of cache context which was fully prepared.
+ *
+ * @param cacheCtx Fully prepared context.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void onCacheStarted(GridCacheContext cacheCtx) throws IgniteCheckedException {
+ GridCacheAdapter cache = cacheCtx.cache();
+ CacheConfiguration cfg = cacheCtx.config();
+ CacheGroupContext grp = cacheGrps.get(cacheCtx.groupId());
+
+ cacheCtx.onStarted();
+
+ String dataRegion = cfg.getDataRegionName();
+
+ if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
+ dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
+
+ if (log.isInfoEnabled()) {
+ log.info("Started cache [name=" + cfg.getName() +
+ ", id=" + cacheCtx.cacheId() +
+ (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
+ ", dataRegionName=" + dataRegion +
+ ", mode=" + cfg.getCacheMode() +
+ ", atomicity=" + cfg.getAtomicityMode() +
+ ", backups=" + cfg.getBackups() +
+ ", mvcc=" + cacheCtx.mvccEnabled() + ']');
+ }
grp.onCacheStarted(cacheCtx);
@@ -2170,6 +2402,17 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * @param grpName Group name.
+ * @return Found group or null.
+ */
+ private CacheGroupContext findCacheGroup(String grpName) {
+ return cacheGrps.values().stream()
+ .filter(grp -> grp.sharedGroup() && grpName.equals(grp.name()))
+ .findAny()
+ .orElse(null);
+ }
+
+ /**
* Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be
* called concurrently with another caches restart.
*/
@@ -4839,7 +5082,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
// Check if we were asked to start a near cache.
if (nearCfg != null) {
- if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) {
+ if (isLocalAffinity(descCfg)) {
// If we are on a data node and near cache was enabled, return success, else - fail.
if (descCfg.getNearConfiguration() != null)
return null;
@@ -4851,7 +5094,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
// If local node has near cache, return success.
req.clientStartOnly(true);
}
- else if (!CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter()))
+ else if (!isLocalAffinity(descCfg))
req.clientStartOnly(true);
req.deploymentId(desc.deploymentId());
@@ -5020,6 +5263,22 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor
}
/**
+ * Handle of fail during cache start.
+ *
+ * @param <T> Type of started data.
+ */
+ private static interface StartCacheFailHandler<T> {
+ /**
+ * Handle of fail.
+ *
+ * @param data Start data.
+ * @param startCacheOperation Operation for start cache.
+ * @throws IgniteCheckedException if failed.
+ */
+ void handle(T data, IgniteThrowableConsumer<T> startCacheOperation) throws IgniteCheckedException;
+ }
+
+ /**
*
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
new file mode 100644
index 0000000..a5aea26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Specific cache information for start.
+ */
+public class StartCacheInfo {
+ /** Cache configuration for start. */
+ private final CacheConfiguration startedConf;
+
+ /** Cache descriptor for start. */
+ private final DynamicCacheDescriptor desc;
+
+ /** Near cache configuration for start. */
+ private final @Nullable NearCacheConfiguration reqNearCfg;
+
+ /** Exchange topology version in which starting happened. */
+ private final AffinityTopologyVersion exchTopVer;
+
+ /** Disable started cache after start or not. */
+ private final boolean disabledAfterStart;
+
+ /**
+ * @param desc Cache configuration for start.
+ * @param reqNearCfg Near cache configuration for start.
+ * @param exchTopVer Exchange topology version in which starting happened.
+ * @param disabledAfterStart Disable started cache after start or not.
+ */
+ public StartCacheInfo(DynamicCacheDescriptor desc,
+ NearCacheConfiguration reqNearCfg,
+ AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) {
+ this(desc.cacheConfiguration(), desc, reqNearCfg, exchTopVer, disabledAfterStart);
+ }
+
+ /**
+ * @param conf Cache configuration for start.
+ * @param desc Cache descriptor for start.
+ * @param reqNearCfg Near cache configuration for start.
+ * @param exchTopVer Exchange topology version in which starting happened.
+ * @param disabledAfterStart Disable started cache after start or not.
+ */
+ public StartCacheInfo(CacheConfiguration conf, DynamicCacheDescriptor desc,
+ NearCacheConfiguration reqNearCfg,
+ AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) {
+ startedConf = conf;
+ this.desc = desc;
+ this.reqNearCfg = reqNearCfg;
+ this.exchTopVer = exchTopVer;
+ this.disabledAfterStart = disabledAfterStart;
+ }
+
+ /**
+ * @return Cache configuration for start.
+ */
+ public CacheConfiguration getStartedConfiguration() {
+ return startedConf;
+ }
+
+ /**
+ * @return Cache descriptor for start.
+ */
+ public DynamicCacheDescriptor getCacheDescriptor() {
+ return desc;
+ }
+
+ /**
+ * @return Near cache configuration for start.
+ */
+ @Nullable public NearCacheConfiguration getReqNearCfg() {
+ return reqNearCfg;
+ }
+
+ /**
+ * @return Exchange topology version in which starting happened.
+ */
+ public AffinityTopologyVersion getExchangeTopVer() {
+ return exchTopVer;
+ }
+
+ /**
+ * @return Disable started cache after start or not.
+ */
+ public boolean isDisabledAfterStart() {
+ return disabledAfterStart;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StartCacheInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d494857..0c2cbe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3512,36 +3512,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
nonLocalCacheGroupDescriptors(),
- new IgniteInClosureX<CacheGroupDescriptor>() {
- @Override public void applyx(CacheGroupDescriptor grpDesc) {
- CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
-
- GridDhtPartitionTopology top = grpCtx != null
- ? grpCtx.topology()
- : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
-
- // Do not validate read or write through caches or caches with disabled rebalance
- // or ExpiryPolicy is set or validation is disabled.
- if (grpCtx == null
- || grpCtx.config().isReadThrough()
- || grpCtx.config().isWriteThrough()
- || grpCtx.config().getCacheStoreFactory() != null
- || grpCtx.config().getRebalanceDelay() == -1
- || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
- || grpCtx.config().getExpiryPolicyFactory() == null
- || SKIP_PARTITION_SIZE_VALIDATION)
- return;
+ grpDesc -> {
+ CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
+
+ GridDhtPartitionTopology top = grpCtx != null
+ ? grpCtx.topology()
+ : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
+
+ // Do not validate read or write through caches or caches with disabled rebalance
+ // or ExpiryPolicy is set or validation is disabled.
+ if (grpCtx == null
+ || grpCtx.config().isReadThrough()
+ || grpCtx.config().isWriteThrough()
+ || grpCtx.config().getCacheStoreFactory() != null
+ || grpCtx.config().getRebalanceDelay() == -1
+ || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
+ || grpCtx.config().getExpiryPolicyFactory() == null
+ || SKIP_PARTITION_SIZE_VALIDATION)
+ return;
- try {
- validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs);
- }
- catch (IgniteCheckedException ex) {
- log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
- // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
- }
+ try {
+ validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs);
}
- },
- null);
+ catch (IgniteCheckedException ex) {
+ log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
+ // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
+ }
+ }
+ );
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to validate partitions state", e);
@@ -3561,21 +3559,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
nonLocalCacheGroupDescriptors(),
- new IgniteInClosureX<CacheGroupDescriptor>() {
- @Override public void applyx(CacheGroupDescriptor grpDesc) {
- CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
+ grpDesc -> {
+ CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
- GridDhtPartitionTopology top = grpCtx != null
- ? grpCtx.topology()
- : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
+ GridDhtPartitionTopology top = grpCtx != null
+ ? grpCtx.topology()
+ : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
- if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
- assignPartitionSizes(top);
- else
- assignPartitionStates(top);
- }
- },
- null);
+ if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+ assignPartitionSizes(top);
+ else
+ assignPartitionStates(top);
+ }
+ );
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to assign partition states", e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e6f374a..2fe0eb8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -142,6 +142,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
@@ -216,19 +217,16 @@ import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFutureCancelledException;
@@ -236,6 +234,7 @@ import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
@@ -10739,54 +10738,91 @@ public abstract class IgniteUtils {
}
/**
+ * Execute operation on data in parallel.
+ *
* @param executorSvc Service for parallel execution.
* @param srcDatas List of data for parallelization.
- * @param consumer Logic for execution of on each item of data.
- * @param errHnd Optionan error handler. If not {@code null}, an error of each item execution will be passed to
- * this handler. If error handler is not {@code null}, the exception will not be thrown from this method.
+ * @param operation Logic for execution of on each item of data.
* @param <T> Type of data.
- * @return List of (item, execution future) tuples.
- * @throws IgniteCheckedException If parallel execution failed and {@code errHnd} is {@code null}.
+ * @throws IgniteCheckedException if parallel execution was failed.
*/
- public static <T> List<T2<T, Future<Object>>> doInParallel(
+ public static <T> void doInParallel(ExecutorService executorSvc, Collection<T> srcDatas,
+ IgniteThrowableConsumer<T> operation) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+ doInParallel(srcDatas.size(), executorSvc, srcDatas, operation);
+ }
+
+ /**
+ * Execute operation on data in parallel.
+ *
+ * @param parallelismLvl Number of threads on which it should be executed.
+ * @param executorSvc Service for parallel execution.
+ * @param srcDatas List of data for parallelization.
+ * @param operation Logic for execution of on each item of data.
+ * @param <T> Type of data.
+ * @throws IgniteCheckedException if parallel execution was failed.
+ */
+ public static <T> void doInParallel(
+ int parallelismLvl,
ExecutorService executorSvc,
Collection<T> srcDatas,
- IgniteInClosureX<T> consumer,
- @Nullable IgniteBiInClosure<T, Throwable> errHnd
- ) throws IgniteCheckedException {
- List<T2<T, Future<Object>>> consumerFutures = srcDatas.stream()
- .map(item -> new T2<>(
- item,
- executorSvc.submit(() -> {
- consumer.apply(item);
+ IgniteThrowableConsumer<T> operation
+ ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+ List<List<T>> batches = IntStream.range(0, parallelismLvl)
+ .mapToObj(i -> new ArrayList<T>())
+ .collect(Collectors.toList());
- return null;
- })))
+ int i = 0;
+
+ for (T src : srcDatas)
+ batches.get(i++ % parallelismLvl).add(src);
+
+ List<Future<Object>> consumerFutures = batches.stream()
+ .filter(batch -> !batch.isEmpty())
+ .map(batch -> executorSvc.submit(() -> {
+ for (T item : batch)
+ operation.accept(item);
+
+ return null;
+ }))
.collect(Collectors.toList());
- IgniteCheckedException composite = null;
+ Throwable error =null;
- for (T2<T, Future<Object>> tup : consumerFutures) {
+ for (Future<Object> future : consumerFutures) {
try {
- getUninterruptibly(tup.get2());
+ future.get();
}
- catch (ExecutionException e) {
- if (errHnd != null)
- errHnd.apply(tup.get1(), e.getCause());
- else {
- if (composite == null)
- composite = new IgniteCheckedException("Failed to execute one of the tasks " +
- "(see suppressed exception for details)");
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- composite.addSuppressed(e.getCause());
- }
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ catch (ExecutionException e) {
+ if(error == null)
+ error = e.getCause();
+ else
+ error.addSuppressed(e.getCause());
+ }
+ catch (CancellationException e) {
+ if(error == null)
+ error = e;
+ else
+ error.addSuppressed(e);
}
}
- if (composite != null)
- throw composite;
+ if (error != null) {
+ if (error instanceof IgniteCheckedException)
+ throw (IgniteCheckedException)error;
- return consumerFutures;
+ if (error instanceof RuntimeException)
+ throw (RuntimeException)error;
+
+ if (error instanceof Error)
+ throw (Error)error;
+
+ throw new IgniteCheckedException(error);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
new file mode 100644
index 0000000..7c501c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/InitializationProtector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+
+/**
+ * Class for avoid multiple initialization of specific value from various threads.
+ */
+public class InitializationProtector {
+ /** Default striped lock concurrency level. */
+ private static final int DEFAULT_CONCURRENCY_LEVEL = Runtime.getRuntime().availableProcessors();
+
+ /** Striped lock. */
+ private GridStripedLock stripedLock = new GridStripedLock(DEFAULT_CONCURRENCY_LEVEL);
+
+ /**
+ * @param protectedKey Unique value by which initialization code should be run only one time.
+ * @param initializedVal Supplier for given already initialized value if it exist or null as sign that
+ * initialization required.
+ * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent.
+ * @param <T> Type of initialization value.
+ * @return Initialized value.
+ * @throws IgniteCheckedException if initialization was failed.
+ */
+ public <T> T protect(Object protectedKey, Supplier<T> initializedVal,
+ IgniteThrowableRunner initializationCode) throws IgniteCheckedException {
+ T value = initializedVal.get();
+
+ if (value != null)
+ return value;
+
+ Lock lock = stripedLock.getLock(protectedKey.hashCode() % stripedLock.concurrencyLevel());
+
+ lock.lock();
+ try {
+ value = initializedVal.get();
+
+ if (value != null)
+ return value;
+
+ initializationCode.run();
+
+ return initializedVal.get();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * It method allows to avoid simultaneous initialization from various threads.
+ *
+ * @param protectedKey Unique value by which initialization code should be run only from one thread in one time.
+ * @param initializationCode Code for initialization value corresponding protectedKey. Should be idempotent.
+ * @throws IgniteCheckedException if initialization was failed.
+ */
+ public void protect(Object protectedKey, IgniteThrowableRunner initializationCode) throws IgniteCheckedException {
+ protect(protectedKey, () -> null, initializationCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
new file mode 100644
index 0000000..46813a9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional
+ * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects.
+ *
+ * @param <E> Type of closure parameter.
+ */
+public interface IgniteThrowableConsumer<E> extends Serializable {
+ /**
+ * Consumer body.
+ *
+ * @param e Consumer parameter.
+ * @throws IgniteCheckedException if body execution was failed.
+ */
+ public void accept(E e) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
new file mode 100644
index 0000000..a5c95e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Represents a throwable runner.
+ */
+public interface IgniteThrowableRunner {
+ /**
+ * Execute a body.
+ */
+ void run() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
new file mode 100644
index 0000000..4e30d1c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartInParallelTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheStartInParallelTest extends GridCommonAbstractTest {
+ /** */
+ private static final int CACHES_COUNT = 40;
+
+ /** */
+ private static final String STATIC_CACHE_PREFIX = "static-cache-";
+
+ /** */
+ private static final String DYNAMIC_CACHE_PREFIX = "dynamic-cache-";
+
+ /** */
+ private static boolean isStaticCache = true;
+
+ /** */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setSystemThreadPoolSize(10);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ long sz = 100 * 1024 * 1024;
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(sz).setMaxSize(sz))
+ .setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ if (isStaticCache) {
+ ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT);
+
+ for (int i = 0; i < CACHES_COUNT; i++)
+ staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i));
+
+ cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT]));
+ }
+
+ return cfg;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration cfg = defaultCacheConfiguration();
+
+ cfg.setName(cacheName);
+ cfg.setBackups(1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanupTestData();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ cleanupTestData();
+ }
+
+ /** */
+ private void cleanupTestData() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL);
+
+ isStaticCache = true;
+ }
+
+ /**
+ * Checking that start static caches in parallel faster than consistenly.
+ *
+ * @throws Exception if fail.
+ */
+ public void testParallelizationAcceleratesStartOfStaticCaches() throws Exception {
+ //start caches consistently.
+ System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false");
+
+ long startTime = System.currentTimeMillis();
+
+ IgniteEx igniteEx = startGrid(0);
+
+ igniteEx.cluster().active(true);
+
+ long totalStartTimeConsistently = System.currentTimeMillis() - startTime;
+
+ //check cache started.
+ for (int i = 0; i < CACHES_COUNT; i++)
+ igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i);
+
+ stopAllGrids();
+
+ //start caches in parallel.
+ System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true");
+
+ startTime = System.currentTimeMillis();
+
+ igniteEx = startGrid(0);
+
+ igniteEx.cluster().active(true);
+
+ long totalStartTimeInParallel = System.currentTimeMillis() - startTime;
+
+ for (int i = 0; i < CACHES_COUNT; i++)
+ igniteEx.cache(STATIC_CACHE_PREFIX + i).put(i, i);
+
+ stopAllGrids();
+
+ assertTrue("Consistently cache stat time : " + totalStartTimeConsistently +
+ "Parallelization cache stat time : " + totalStartTimeInParallel,
+ totalStartTimeConsistently > totalStartTimeInParallel);
+ }
+
+ /**
+ * Checking that start dynamic caches in parallel faster than consistenly.
+ *
+ * @throws Exception if fail.
+ */
+ public void testParallelizationAcceleratesStartOfCaches2() throws Exception {
+ //prepare dynamic caches.
+ isStaticCache = false;
+
+ IgniteEx igniteEx = startGrid(0);
+
+ igniteEx.cluster().active(true);
+
+ for (int i = 0; i < CACHES_COUNT; i++)
+ igniteEx.getOrCreateCache(DYNAMIC_CACHE_PREFIX + i);
+
+ stopAllGrids();
+
+ //start caches consistently.
+ System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "false");
+
+ igniteEx = startGrid(0);
+ long startTime = System.currentTimeMillis();
+
+ igniteEx.cluster().active(true);
+
+ long totalStartTimeConsistently = System.currentTimeMillis() - startTime;
+
+ for (int i = 0; i < CACHES_COUNT; i++)
+ igniteEx.cache(DYNAMIC_CACHE_PREFIX + i);
+
+ stopAllGrids();
+
+ //start caches in parallel.
+ System.setProperty(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, "true");
+
+ startTime = System.currentTimeMillis();
+
+ igniteEx = startGrid(0);
+
+ igniteEx.cluster().active(true);
+
+ long totalStartTimeInParallel = System.currentTimeMillis() - startTime;
+
+ for (int i = 0; i < CACHES_COUNT; i++)
+ igniteEx.cache(DYNAMIC_CACHE_PREFIX + i).put(i, i);
+
+ stopAllGrids();
+
+ assertTrue("Consistently cache stat time : " + totalStartTimeConsistently +
+ "Parallelization cache stat time : " + totalStartTimeInParallel,
+ totalStartTimeConsistently > totalStartTimeInParallel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 870ce67..66453b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import javax.cache.Cache;
-import javax.cache.configuration.Factory;
-import javax.cache.integration.CacheLoaderException;
-import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
@@ -61,7 +61,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
CacheConfiguration cfg3 = cacheConfiguration("cacheC", new SecondStoreFactory());
CacheConfiguration cfg4 = cacheConfiguration("cacheD", null);
- cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4);
+ cfg.setCacheConfiguration(cfg4, cfg2, cfg3, cfg1);
return cfg;
}
@@ -92,6 +92,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
+ cleanPersistenceDir();
+
startGrids(4);
}
@@ -160,8 +162,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
"delete cacheA",
"write cacheB",
"sessionEnd true"
- ),
- firstStoreEvts);
+ ),
+ firstStoreEvts);
assertEquals(0, secondStoreEvts.size());
}
@@ -209,16 +211,16 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
"write cacheA",
"delete cacheA",
"sessionEnd true"
- ),
- firstStoreEvts);
+ ),
+ firstStoreEvts);
assertEqualsCollections(F.asList(
"writeAll cacheC 2",
"deleteAll cacheC 2",
"write cacheC",
"sessionEnd true"
- ),
- secondStoreEvts);
+ ),
+ secondStoreEvts);
}
/**
@@ -264,8 +266,8 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
"write cacheA",
"delete cacheA",
"sessionEnd true"
- ),
- firstStoreEvts);
+ ),
+ firstStoreEvts);
assertEquals(0, secondStoreEvts.size());
}
@@ -361,15 +363,10 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
private Ignite ignite;
/** {@inheritDoc} */
- @Override public CacheStore create() {
+ @Override public synchronized CacheStore create() {
String igniteInstanceName = ignite.name();
- CacheStore store = firstStores.get(igniteInstanceName);
-
- if (store == null)
- store = F.addIfAbsent(firstStores, igniteInstanceName, new TestStore());
-
- return store;
+ return firstStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
}
}
@@ -384,12 +381,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
@Override public CacheStore create() {
String igniteInstanceName = ignite.name();
- CacheStore store = secondStores.get(igniteInstanceName);
-
- if (store == null)
- store = F.addIfAbsent(secondStores, igniteInstanceName, new TestStore());
-
- return store;
+ return secondStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 61a076e..13a1044 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -44,6 +44,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterGroup;
@@ -877,6 +881,76 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ public void testDoInParallel() throws Throwable {
+ CyclicBarrier barrier = new CyclicBarrier(3);
+
+ IgniteUtils.doInParallel(3,
+ Executors.newFixedThreadPool(3),
+ Arrays.asList(1, 2, 3),
+ i -> {
+ try {
+ barrier.await(1, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ );
+ }
+
+ /**
+ *
+ */
+ public void testDoInParallelBatch() {
+ CyclicBarrier barrier = new CyclicBarrier(3);
+
+ try {
+ IgniteUtils.doInParallel(2,
+ Executors.newFixedThreadPool(3),
+ Arrays.asList(1, 2, 3),
+ i -> {
+ try {
+ barrier.await(400, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ );
+
+ fail("Should throw timeout exception");
+ }
+ catch (Exception e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
+ /**
+ *
+ */
+ public void testDoInParallelException() {
+ String expectedException = "ExpectedException";
+
+ try {
+ IgniteUtils.doInParallel(3,
+ Executors.newFixedThreadPool(1),
+ Arrays.asList(1, 2, 3),
+ i -> {
+ if (i == 1)
+ throw new IgniteCheckedException(expectedException);
+ }
+ );
+
+ fail("Should throw ParallelExecutionException");
+ }
+ catch (IgniteCheckedException e) {
+ assertEquals(expectedException, e.getMessage());
+ }
+ }
+
+ /**
* Test enum.
*/
private enum TestEnum {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1f8f46f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index d0734a8..0381a1f 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartition
import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheStartInParallelTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest;
@@ -99,6 +100,8 @@ public class IgniteCacheTestSuite7 extends TestSuite {
suite.addTestSuite(CacheRentingStateRepairTest.class);
+ suite.addTestSuite(CacheStartInParallelTest.class);
+
suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class);
suite.addTestSuite(CacheDataLossOnPartitionMoveTest.class);