You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 06:09:10 UTC
[19/28] ignite git commit: IGNITE-9682 Update partition full map in
parallel - Fixes #4824.
IGNITE-9682 Update partition full map in parallel - Fixes #4824.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/862c9264
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/862c9264
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/862c9264
Branch: refs/heads/ignite-627
Commit: 862c9264fc05816f96ea594807854502ff3dd00a
Parents: 2906a16
Author: Oleg Ostanin <oo...@gridgain.com>
Authored: Mon Oct 29 13:28:07 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 29 13:28:07 2018 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 66 ++++++++++++--------
1 file changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/862c9264/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0fc9c24..9314096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -130,6 +130,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
+import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
/**
* Future for exchanging partition maps.
@@ -3948,39 +3949,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
long time = System.currentTimeMillis();
- for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
- Integer grpId = entry.getKey();
+ int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
- CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+ // Reserve at least 2 threads for system operations.
+ parallelismLvl = Math.max(1, parallelismLvl - 2);
- if (grp != null) {
- CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
- grp.topology().partitions());
+ try {
+ doInParallel(
+ parallelismLvl,
+ cctx.kernalContext().getSystemExecutorService(),
+ msg.partitions().keySet(), grpId -> {
+ CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
- grp.topology().update(resTopVer,
- entry.getValue(),
- cntrMap,
- msg.partsToReload(cctx.localNodeId(), grpId),
- msg.partitionSizes(grpId),
- null);
- }
- else {
- ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
+ if (grp != null) {
+ CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+ grp.topology().partitions());
- if (oldest != null && oldest.isLocal()) {
- GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
+ grp.topology().update(resTopVer,
+ msg.partitions().get(grpId),
+ cntrMap,
+ msg.partsToReload(cctx.localNodeId(), grpId),
+ msg.partitionSizes(grpId),
+ null);
+ }
+ else {
+ ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
- CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
- top.partitions());
+ if (oldest != null && oldest.isLocal()) {
+ GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
- top.update(resTopVer,
- entry.getValue(),
- cntrMap,
- Collections.emptySet(),
- null,
- null);
- }
- }
+ CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+ top.partitions());
+
+ top.update(resTopVer,
+ msg.partitions().get(grpId),
+ cntrMap,
+ Collections.emptySet(),
+ null,
+ null);
+ }
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
partitionsReceived = true;