You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/04 11:44:09 UTC
[ignite] branch master updated: IGNITE-10876 Parallel execution of
affinity changes on coordinator - Fixes #5942.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 51accac IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942.
51accac is described below
commit 51accac9f748689be72bfbce390a19fb8a6896f8
Author: Pavel Voronkin <pv...@gridgain.com>
AuthorDate: Mon Feb 4 13:48:16 2019 +0300
IGNITE-10876 Parallel execution of affinity changes on coordinator - Fixes #5942.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/GridCachePartitionExchangeManager.java | 17 --
.../dht/preloader/CacheGroupAffinityMessage.java | 25 ++-
.../preloader/GridDhtPartitionsExchangeFuture.java | 193 +++++++++++++--------
.../dht/topology/GridDhtPartitionTopologyImpl.java | 139 +++++++--------
.../cache/distributed/CacheParallelStartTest.java | 54 +++---
5 files changed, 232 insertions(+), 196 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 525c41a..71a704c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -200,9 +200,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private GridFutureAdapter<?> reconnectExchangeFut;
- /** */
- private final Object interruptLock = new Object();
-
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -818,13 +815,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @return Interrupt lock.
- */
- public Object interruptLock() {
- return interruptLock;
- }
-
- /**
* @param grpId Cache group ID.
* @return Topology.
*/
@@ -2639,13 +2629,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- /** {@inheritDoc} */
- @Override public void cancel() {
- synchronized (interruptLock) {
- super.cancel();
- }
- }
-
/**
* Add custom exchange task.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 695eadc..ef56834 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
@@ -135,25 +136,23 @@ public class CacheGroupAffinityMessage implements Message {
}
/**
+ * Fill Map of CacheGroupAffinityMessages.
+ *
* @param cctx Context.
* @param topVer Topology version.
* @param affReq Cache group IDs.
* @param cachesAff Optional already prepared affinity.
- * @return Affinity.
*/
- static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages(
+ static void createAffinityMessages(
GridCacheSharedContext cctx,
AffinityTopologyVersion topVer,
Collection<Integer> affReq,
- @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff
+ Map<Integer, CacheGroupAffinityMessage> cachesAff
) {
assert !F.isEmpty(affReq) : affReq;
- if (cachesAff == null)
- cachesAff = U.newHashMap(affReq.size());
-
for (Integer grpId : affReq) {
- if (!cachesAff.containsKey(grpId)) {
+ cachesAff.computeIfAbsent(grpId, (integer) -> {
GridAffinityAssignmentCache aff = cctx.affinity().groupAffinity(grpId);
// If no coordinator group holder on the node, try fetch affinity from existing cache group.
@@ -169,15 +168,13 @@ public class CacheGroupAffinityMessage implements Message {
List<List<ClusterNode>> assign = aff.readyAssignments(topVer);
- CacheGroupAffinityMessage msg = new CacheGroupAffinityMessage(assign,
+ return new CacheGroupAffinityMessage(
+ assign,
aff.centralizedAffinityFunction() ? aff.idealAssignment() : null,
- null);
-
- cachesAff.put(grpId, msg);
- }
+ null
+ );
+ });
}
-
- return cachesAff;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5cfa56e..f4043ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -3108,25 +3109,31 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param resTopVer Result topology version.
*/
private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
- boolean detected = false;
+ AtomicInteger detected = new AtomicInteger();
- synchronized (cctx.exchange().interruptLock()) {
- if (Thread.currentThread().isInterrupted())
- return;
-
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (!grp.isLocal()) {
- // Do not trigger lost partition events on start.
- boolean event = !localJoinExchange() && !activateCluster();
+ try {
+ // Reserve at least 2 threads for system operations.
+ U.doInParallel(
+ U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
+ cctx.kernalContext().getSystemExecutorService(),
+ cctx.cache().cacheGroups(),
+ grp -> {
+ if (!grp.isLocal()) {
+ // Do not trigger lost partition events on start.
+ boolean evt = !localJoinExchange() && !activateCluster();
- boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, event ? events().lastEvent() : null);
+ if (grp.topology().detectLostPartitions(resTopVer, evt ? events().lastEvent() : null))
+ detected.incrementAndGet();
+ }
- detected |= detectedOnGrp;
- }
- }
+ return null;
+ });
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
- if (detected) {
+ if (detected.get() > 0) {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
"Lost partitions detect on " + resTopVer + "]");
@@ -3143,22 +3150,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void resetLostPartitions(Collection<String> cacheNames) {
assert !exchCtx.mergeExchanges();
- synchronized (cctx.exchange().interruptLock()) {
- if (Thread.currentThread().isInterrupted())
- return;
-
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ try {
+ // Reserve at least 2 threads for system operations.
+ U.doInParallel(
+ U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2),
+ cctx.kernalContext().getSystemExecutorService(),
+ cctx.cache().cacheGroups(),
+ grp -> {
+ if (grp.isLocal())
+ return null;
- for (String cacheName : cacheNames) {
- if (grp.hasCache(cacheName)) {
- grp.topology().resetLostPartitions(initialVersion());
+ for (String cacheName : cacheNames) {
+ if (grp.hasCache(cacheName)) {
+ grp.topology().resetLostPartitions(initialVersion());
- break;
+ break;
+ }
}
- }
- }
+
+ return null;
+ });
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
}
@@ -3295,6 +3309,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
+ // Reserve at least 2 threads for system operations.
+ int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
+
if (exchCtx.mergeExchanges()) {
synchronized (mux) {
if (mergedJoinExchMsgs != null) {
@@ -3315,51 +3332,65 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
else
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
- for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) {
- if (desc.config().getCacheMode() == CacheMode.LOCAL)
- continue;
- CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+ U.doInParallel(
+ parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ cctx.affinity().cacheGroups().values(),
+ desc -> {
+ if (desc.config().getCacheMode() == CacheMode.LOCAL)
+ return null;
- GridDhtPartitionTopology top = grp != null ? grp.topology() :
- cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
+ CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
- top.beforeExchange(this, true, true);
- }
+ GridDhtPartitionTopology top = grp != null ? grp.topology() :
+ cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
+
+ top.beforeExchange(this, true, true);
+
+ return null;
+ });
}
timeBag.finishGlobalStage("Affinity recalculation (crd)");
- Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+ Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>(cctx.cache().cacheGroups().size());
- for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- GridDhtPartitionsSingleMessage msg = e.getValue();
+ U.doInParallel(
+ parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ msgs.entrySet(),
+ entry -> {
+ GridDhtPartitionsSingleMessage msg = entry.getValue();
- // Apply update counters after all single messages are received.
- for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
- Integer grpId = entry.getKey();
+ for (Map.Entry<Integer, GridDhtPartitionMap> e : msg.partitions().entrySet()) {
+ Integer grpId = e.getKey();
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- GridDhtPartitionTopology top = grp != null ? grp.topology() :
- cctx.exchange().clientTopology(grpId, events().discoveryCache());
+ GridDhtPartitionTopology top = grp != null
+ ? grp.topology()
+ : cctx.exchange().clientTopology(grpId, events().discoveryCache());
- CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
- top.partitions());
+ CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, top.partitions());
- if (cntrs != null)
- top.collectUpdateCounters(cntrs);
- }
+ if (cntrs != null)
+ top.collectUpdateCounters(cntrs);
+ }
- Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+ Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
- if (affReq != null) {
- joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
- resTopVer,
- affReq,
- joinedNodeAff);
+ if (affReq != null)
+ CacheGroupAffinityMessage.createAffinityMessages(
+ cctx,
+ resTopVer,
+ affReq,
+ joinedNodeAff
+ );
+
+ return null;
}
- }
+ );
timeBag.finishGlobalStage("Collect update counters and create affinity messages");
@@ -3708,14 +3739,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
if (affReq != null) {
- Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
+ Map<Integer, CacheGroupAffinityMessage> cachesAff = U.newHashMap(affReq.size());
+
+ CacheGroupAffinityMessage.createAffinityMessages(
cctx,
finishState.resTopVer,
affReq,
- null);
+ cachesAff);
- fullMsg.joinedNodeAffinity(aff);
+ fullMsg.joinedNodeAffinity(cachesAff);
}
if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
@@ -4572,21 +4606,38 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages();
if (!F.isEmpty(msgs)) {
- Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+ Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = new ConcurrentHashMap<>();
- for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- this.msgs.put(e.getKey().id(), e.getValue());
+ // Reserve at least 2 threads for system operations.
+ int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
- GridDhtPartitionsSingleMessage msg = e.getValue();
-
- Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+ try {
+ U.doInParallel(
+ parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ msgs.entrySet(),
+ entry -> {
+ this.msgs.put(entry.getKey().id(), entry.getValue());
+
+ GridDhtPartitionsSingleMessage msg = entry.getValue();
+
+ Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+
+ if (!F.isEmpty(affReq)) {
+ CacheGroupAffinityMessage.createAffinityMessages(
+ cctx,
+ fullMsg.resultTopologyVersion(),
+ affReq,
+ joinedNodeAff
+ );
+ }
- if (!F.isEmpty(affReq)) {
- joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
- fullMsg.resultTopologyVersion(),
- affReq,
- joinedNodeAff);
- }
+ return null;
+ }
+ );
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index b0decae..ba11df7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -505,108 +505,103 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ctx.database().checkpointReadLock();
try {
- synchronized (ctx.exchange().interruptLock()) {
- if (Thread.currentThread().isInterrupted())
- throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
-
- U.writeLock(lock);
+ U.writeLock(lock);
- try {
- if (stopping)
- return;
+ try {
+ if (stopping)
+ return;
- assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
- ", exchId=" + exchFut.exchangeId() + ']';
+ assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
+ ", exchId=" + exchFut.exchangeId() + ']';
- ExchangeDiscoveryEvents evts = exchFut.context().events();
+ ExchangeDiscoveryEvents evts = exchFut.context().events();
- if (affReady) {
- assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" +
- "grp=" + grp.cacheOrGroupName() +
- ", affVer=" + grp.affinity().lastVersion() +
- ", evtsVer=" + evts.topologyVersion() + ']';
+ if (affReady) {
+ assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" +
+ "grp=" + grp.cacheOrGroupName() +
+ ", affVer=" + grp.affinity().lastVersion() +
+ ", evtsVer=" + evts.topologyVersion() + ']';
- lastTopChangeVer = readyTopVer = evts.topologyVersion();
+ lastTopChangeVer = readyTopVer = evts.topologyVersion();
- discoCache = evts.discoveryCache();
- }
+ discoCache = evts.discoveryCache();
+ }
- if (log.isDebugEnabled()) {
- log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
- ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
+ ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
+ }
- long updateSeq = this.updateSeq.incrementAndGet();
+ long updateSeq = this.updateSeq.incrementAndGet();
- cntrMap.clear();
+ cntrMap.clear();
- initializeFullMap(updateSeq);
+ initializeFullMap(updateSeq);
- boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
+ boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
- if (evts.hasServerLeft()) {
- List<DiscoveryEvent> evts0 = evts.events();
+ if (evts.hasServerLeft()) {
+ List<DiscoveryEvent> evts0 = evts.events();
- for (int i = 0; i < evts0.size(); i++) {
- DiscoveryEvent evt = evts0.get(i);
+ for (int i = 0; i < evts0.size(); i++) {
+ DiscoveryEvent evt = evts0.get(i);
- if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
- removeNode(evt.eventNode().id());
- }
+ if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+ removeNode(evt.eventNode().id());
}
+ }
- if (grp.affinityNode()) {
- if (grpStarted ||
- exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
- exchFut.serverNodeDiscoveryEvent()) {
-
- AffinityTopologyVersion affVer;
- List<List<ClusterNode>> affAssignment;
+ if (grp.affinityNode()) {
+ if (grpStarted ||
+ exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
+ exchFut.serverNodeDiscoveryEvent()) {
- if (affReady) {
- affVer = evts.topologyVersion();
+ AffinityTopologyVersion affVer;
+ List<List<ClusterNode>> affAssignment;
- assert grp.affinity().lastVersion().equals(affVer) :
- "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
- ", grp=" + grp.cacheOrGroupName() +
- ", affVer=" + affVer +
- ", fut=" + exchFut + ']';
+ if (affReady) {
+ affVer = evts.topologyVersion();
- affAssignment = grp.affinity().readyAssignments(affVer);
- }
- else {
- assert !exchFut.context().mergeExchanges();
+ assert grp.affinity().lastVersion().equals(affVer) :
+ "Invalid affinity [topVer=" + grp.affinity().lastVersion() +
+ ", grp=" + grp.cacheOrGroupName() +
+ ", affVer=" + affVer +
+ ", fut=" + exchFut + ']';
- affVer = exchFut.initialVersion();
- affAssignment = grp.affinity().idealAssignment();
- }
+ affAssignment = grp.affinity().readyAssignments(affVer);
+ }
+ else {
+ assert !exchFut.context().mergeExchanges();
- initPartitions(affVer, affAssignment, exchFut, updateSeq);
+ affVer = exchFut.initialVersion();
+ affAssignment = grp.affinity().idealAssignment();
}
- }
- consistencyCheck();
+ initPartitions(affVer, affAssignment, exchFut, updateSeq);
+ }
+ }
- if (updateMoving) {
- assert grp.affinity().lastVersion().equals(evts.topologyVersion());
+ consistencyCheck();
- createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
- }
+ if (updateMoving) {
+ assert grp.affinity().lastVersion().equals(evts.topologyVersion());
- if (log.isDebugEnabled()) {
- log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " +
- "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
- }
+ createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
+ }
- if (log.isTraceEnabled()) {
- log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName()
- + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " +
+ "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
}
- finally {
- lock.writeLock().unlock();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName()
+ + ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
}
}
+ finally {
+ lock.writeLock().unlock();
+ }
}
finally {
ctx.database().checkpointReadUnlock();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
index f8ce7b4..d01d822 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheParallelStartTest.java
@@ -38,7 +38,11 @@ import org.junit.Test;
*/
public class CacheParallelStartTest extends GridCommonAbstractTest {
/** */
- private static final int CACHES_COUNT = 500;
+ private static final int CACHES_COUNT = 5000;
+
+ /** */
+ private static final int GROUPS_COUNT = 50;
+
/** */
private static final String STATIC_CACHE_PREFIX = "static-cache-";
@@ -54,11 +58,15 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
cfg.setSystemThreadPoolSize(Runtime.getRuntime().availableProcessors() * 3);
- long sz = 100 * 1024 * 1024;
+ long sz = 512 * 1024 * 1024;
DataStorageConfiguration memCfg = new DataStorageConfiguration().setPageSize(1024)
.setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setPersistenceEnabled(false).setInitialSize(sz).setMaxSize(sz))
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(false)
+ .setInitialSize(sz)
+ .setMaxSize(sz)
+ )
.setWalMode(WALMode.LOG_ONLY).setCheckpointFrequency(24L * 60 * 60 * 1000);
cfg.setDataStorageConfiguration(memCfg);
@@ -66,7 +74,7 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
ArrayList<Object> staticCaches = new ArrayList<>(CACHES_COUNT);
for (int i = 0; i < CACHES_COUNT; i++)
- staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX + i));
+ staticCaches.add(cacheConfiguration(STATIC_CACHE_PREFIX, i));
cfg.setCacheConfiguration(staticCaches.toArray(new CacheConfiguration[CACHES_COUNT]));
@@ -77,12 +85,12 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
* @param cacheName Cache name.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration(String cacheName) {
+ private CacheConfiguration cacheConfiguration(String cacheName, int i) {
CacheConfiguration cfg = defaultCacheConfiguration();
- cfg.setName(cacheName);
+ cfg.setName(cacheName + i);
cfg.setBackups(1);
- cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME);
+ cfg.setGroupName(STATIC_CACHE_CACHE_GROUP_NAME + i % GROUPS_COUNT);
cfg.setIndexedTypes(Long.class, Long.class);
return cfg;
@@ -175,20 +183,22 @@ public class CacheParallelStartTest extends GridCommonAbstractTest {
*
*/
private void assertCaches(IgniteEx igniteEx) {
- Collection<GridCacheContext> caches = igniteEx
- .context()
- .cache()
- .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME))
- .caches();
-
- assertEquals(CACHES_COUNT, caches.size());
-
- @Nullable CacheGroupContext cacheGroup = igniteEx
- .context()
- .cache()
- .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME));
-
- for (GridCacheContext cacheContext : caches)
- assertEquals(cacheContext.group(), cacheGroup);
+ for (int i = 0; i < GROUPS_COUNT; i++) {
+ Collection<GridCacheContext> caches = igniteEx
+ .context()
+ .cache()
+ .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i))
+ .caches();
+
+ assertEquals(CACHES_COUNT / GROUPS_COUNT, caches.size());
+
+ @Nullable CacheGroupContext cacheGrp = igniteEx
+ .context()
+ .cache()
+ .cacheGroup(CU.cacheId(STATIC_CACHE_CACHE_GROUP_NAME + i));
+
+ for (GridCacheContext cacheContext : caches)
+ assertEquals(cacheContext.group(), cacheGrp);
+ }
}
}