You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/30 06:09:10 UTC

[19/28] ignite git commit: IGNITE-9682 Update partition full map in parallel - Fixes #4824.

IGNITE-9682 Update partition full map in parallel - Fixes #4824.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/862c9264
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/862c9264
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/862c9264

Branch: refs/heads/ignite-627
Commit: 862c9264fc05816f96ea594807854502ff3dd00a
Parents: 2906a16
Author: Oleg Ostanin <oo...@gridgain.com>
Authored: Mon Oct 29 13:28:07 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Oct 29 13:28:07 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 66 ++++++++++++--------
 1 file changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/862c9264/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0fc9c24..9314096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -130,6 +130,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
 import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
+import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
 
 /**
  * Future for exchanging partition maps.
@@ -3948,39 +3949,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         long time = System.currentTimeMillis();
 
-        for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
-            Integer grpId = entry.getKey();
+        int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
 
-            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+        // Reserve at least 2 threads for system operations.
+        parallelismLvl = Math.max(1, parallelismLvl - 2);
 
-            if (grp != null) {
-                CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
-                    grp.topology().partitions());
+        try {
+            doInParallel(
+                parallelismLvl,
+                cctx.kernalContext().getSystemExecutorService(),
+                msg.partitions().keySet(), grpId -> {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                grp.topology().update(resTopVer,
-                    entry.getValue(),
-                    cntrMap,
-                    msg.partsToReload(cctx.localNodeId(), grpId),
-                    msg.partitionSizes(grpId),
-                    null);
-            }
-            else {
-                ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
+                    if (grp != null) {
+                        CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+                            grp.topology().partitions());
 
-                if (oldest != null && oldest.isLocal()) {
-                    GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
+                        grp.topology().update(resTopVer,
+                            msg.partitions().get(grpId),
+                            cntrMap,
+                            msg.partsToReload(cctx.localNodeId(), grpId),
+                            msg.partitionSizes(grpId),
+                            null);
+                    }
+                    else {
+                        ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
 
-                    CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
-                        top.partitions());
+                        if (oldest != null && oldest.isLocal()) {
+                            GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
-                    top.update(resTopVer,
-                        entry.getValue(),
-                        cntrMap,
-                        Collections.emptySet(),
-                        null,
-                        null);
-                }
-            }
+                            CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+                                top.partitions());
+
+                            top.update(resTopVer,
+                                msg.partitions().get(grpId),
+                                cntrMap,
+                                Collections.emptySet(),
+                                null,
+                                null);
+                        }
+                    }
+                });
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
 
         partitionsReceived = true;