You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/23 15:01:12 UTC

[08/50] [abbrv] ignite git commit: IGNITE-9868 Improved background full message sending - Fixes #4975.

IGNITE-9868 Improved background full message sending - Fixes #4975.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce73c9d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce73c9d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce73c9d8

Branch: refs/heads/ignite-gg-14206
Commit: ce73c9d85ffd5b50e4c3370b13302605392fa572
Parents: aec3f91
Author: Sergey Antonov <an...@gmail.com>
Authored: Wed Oct 17 20:08:34 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 17 20:18:36 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 177 ++++++++++++++-----
 .../GridDhtPartitionsExchangeFuture.java        |  12 +-
 2 files changed, 135 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0baf5a3..6af9678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1028,11 +1028,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * Partition refresh callback.
+     * Partition refresh callback for selected cache groups.
      * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
      * for non coordinator -  {@link GridDhtPartitionsSingleMessage SingleMessages} send
+     *
+     * @param grps Cache groups for partitions refresh.
      */
-    public void refreshPartitions() {
+    public void refreshPartitions(@NotNull Collection<CacheGroupContext> grps) {
         // TODO https://issues.apache.org/jira/browse/IGNITE-6857
         if (cctx.snapshot().snapshotOperationInProgress()) {
             scheduleResendPartitions();
@@ -1040,6 +1042,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
         }
 
+        if (grps.isEmpty()) {
+            if (log.isDebugEnabled())
+                log.debug("Skip partitions refresh, there are no cache groups for partition refresh.");
+
+            return;
+        }
+
         ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -1049,8 +1058,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
         }
 
-        if (log.isDebugEnabled())
-            log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
+        if (log.isDebugEnabled()) {
+            log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() +
+                ", cacheGroups= " + grps + ']');
+        }
 
         // If this is the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -1068,50 +1079,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             // No need to send to nodes which did not finish their first exchange.
             AffinityTopologyVersion rmtTopVer =
-                lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
+                lastFut != null ?
+                    (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion())
+                    : AffinityTopologyVersion.NONE;
 
             Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
 
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
 
-            sendAllPartitions(rmts, rmtTopVer);
+            sendAllPartitions(rmts, rmtTopVer, grps);
         }
         else {
             if (log.isDebugEnabled())
                 log.debug("Refreshing local partitions from non-oldest node: " +
                     cctx.localNodeId());
 
-            sendLocalPartitions(oldest, null);
+            sendLocalPartitions(oldest, null, grps);
         }
     }
 
     /**
+     * Partition refresh callback.
+     * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
+     * for non coordinator -  {@link GridDhtPartitionsSingleMessage SingleMessages} send
+     */
+    public void refreshPartitions() { refreshPartitions(cctx.cache().cacheGroups()); }
+
+    /**
      * @param nodes Nodes.
      * @param msgTopVer Topology version. Will be added to full message.
+     * @param grps Selected cache groups.
      */
     private void sendAllPartitions(
         Collection<ClusterNode> nodes,
-        AffinityTopologyVersion msgTopVer
+        AffinityTopologyVersion msgTopVer,
+        Collection<CacheGroupContext> grps
     ) {
         long time = System.currentTimeMillis();
 
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null, grps);
 
         m.topologyVersion(msgTopVer);
 
         if (log.isInfoEnabled()) {
             long latency = System.currentTimeMillis() - time;
 
-            if (latency > 100 || log.isDebugEnabled())
-                log.info("Full Message creating for " + msgTopVer + " performed in " + latency + " ms.");
+            if (latency > 50 || log.isDebugEnabled()) {
+                log.info("Finished full message creation [msgTopVer=" + msgTopVer + ", groups=" + grps +
+                    ", latency=" + latency + "ms]");
+            }
         }
 
         if (log.isTraceEnabled())
-            log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
+            log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", cacheGroups=" + grps +
+                ", msg=" + m + ']');
 
         time = System.currentTimeMillis();
 
+        Collection<ClusterNode> failedNodes = U.newHashSet(nodes.size());
+
         for (ClusterNode node : nodes) {
             try {
                 assert !node.equals(cctx.localNode());
@@ -1119,22 +1146,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
             }
             catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
-                        node.id() + ", msg=" + m + ']');
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed to send partition update to node because it left grid (will ignore) " +
+                        "[node=" + node.id() + ", msg=" + m + ']');
+                }
             }
             catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']');
+                failedNodes.add(node);
+
+                U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']', e);
             }
         }
 
-        if (log.isInfoEnabled())
-            log.info("Sending Full Message for " + msgTopVer + " performed in " + (System.currentTimeMillis() - time) + " ms.");
+        if (log.isInfoEnabled()) {
+            long latency = System.currentTimeMillis() - time;
+
+            if (latency > 50 || log.isDebugEnabled()) {
+                log.info("Finished sending full message [msgTopVer=" + msgTopVer + ", groups=" + grps +
+                    (failedNodes.isEmpty() ? "" : (", skipped=" + U.nodeIds(failedNodes))) +
+                    ", latency=" + latency + "ms]");
+            }
+        }
     }
 
     /**
+     * Creates partitions full message for all cache groups.
+     *
      * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/
-     *     finishUnmarshall methods are called).
+     * finishUnmarshall methods are called).
      * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
@@ -1150,18 +1189,43 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
         @Nullable IgniteDhtPartitionsToReloadMap partsToReload
     ) {
-        final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
-            lastVer,
-            exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
-            partHistSuppliers,
-            partsToReload
-            );
+        Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
+
+        return createPartitionsFullMessage(compress, newCntrMap, exchId, lastVer, partHistSuppliers, partsToReload, grps);
+    }
+
+    /**
+     * Creates partitions full message for selected cache groups.
+     *
+     * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/
+     *     finishUnmarshall methods are called).
+     * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
+     * @param exchId Non-null exchange ID if message is created for exchange.
+     * @param lastVer Last version.
+     * @param partHistSuppliers Partition history suppliers map.
+     * @param partsToReload Partitions to reload map.
+     * @param grps Selected cache groups.
+     * @return Message.
+     */
+    public GridDhtPartitionsFullMessage createPartitionsFullMessage(
+        boolean compress,
+        boolean newCntrMap,
+        @Nullable final GridDhtPartitionExchangeId exchId,
+        @Nullable GridCacheVersion lastVer,
+        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
+        Collection<CacheGroupContext> grps
+    ) {
+        AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
+
+        final GridDhtPartitionsFullMessage m =
+            new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload);
 
         m.compress(compress);
 
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+        for (CacheGroupContext grp : grps) {
             if (!grp.isLocal()) {
                 if (exchId != null) {
                     AffinityTopologyVersion startTopVer = grp.localStartVersion();
@@ -1174,14 +1238,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                 GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
 
-                if (locMap != null) {
-                    addFullPartitionsMap(m,
-                        dupData,
-                        compress,
-                        grp.groupId(),
-                        locMap,
-                        affCache.similarAffinityKey());
-                }
+                if (locMap != null)
+                    addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey());
 
                 m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
 
@@ -1202,14 +1260,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
             GridDhtPartitionFullMap map = top.partitionMap(true);
 
-            if (map != null) {
-                addFullPartitionsMap(m,
-                    dupData,
-                    compress,
-                    top.groupId(),
-                    map,
-                    top.similarAffinityKey());
-            }
+            if (map != null)
+                addFullPartitionsMap(m, dupData, compress, top.groupId(), map, top.similarAffinityKey());
 
             if (exchId != null) {
                 CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters();
@@ -1269,13 +1321,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param node Destination cluster node.
      * @param id Exchange ID.
+     * @param grps Cache groups for send partitions.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
-            cctx.kernalContext().clientNode(),
-            false,
-            false,
-            null);
+    private void sendLocalPartitions(
+        ClusterNode node,
+        @Nullable GridDhtPartitionExchangeId id,
+        @NotNull Collection<CacheGroupContext> grps
+    ) {
+        GridDhtPartitionsSingleMessage m =
+            createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false, false, null, grps);
 
         if (log.isTraceEnabled())
             log.trace("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -1294,6 +1348,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Creates partitions single message for all cache groups.
+     *
      * @param exchangeId Exchange ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
@@ -1307,6 +1363,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         boolean newCntrMap,
         ExchangeActions exchActions
     ) {
+        Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
+
+        return createPartitionsSingleMessage(exchangeId, clientOnlyExchange, sndCounters, newCntrMap, exchActions, grps);
+    }
+
+    /**
+     * Creates partitions single message for selected cache groups.
+     *
+     * @param exchangeId Exchange ID.
+     * @param clientOnlyExchange Client exchange flag.
+     * @param sndCounters {@code True} if need send partition update counters.
+     * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}.
+     * @param grps Selected cache groups.
+     * @return Message.
+     */
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
+        @Nullable GridDhtPartitionExchangeId exchangeId,
+        boolean clientOnlyExchange,
+        boolean sndCounters,
+        boolean newCntrMap,
+        ExchangeActions exchActions,
+        Collection<CacheGroupContext> grps
+    ) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
             clientOnlyExchange,
             cctx.versions().last(),
@@ -1314,7 +1393,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new HashMap<>();
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+        for (CacheGroupContext grp : grps) {
             if (!grp.isLocal() && (exchActions == null || !exchActions.cacheGroupStopping(grp.groupId()))) {
                 GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e550a8b..0fe1a25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2080,22 +2080,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (centralizedAff || forceAffReassignment) {
                 assert !exchCtx.mergeExchanges();
 
+                Collection<CacheGroupContext> grpToRefresh = U.newHashSet(cctx.cache().cacheGroups().size());
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
 
-                    boolean needRefresh = false;
-
                     try {
-                        needRefresh = grp.topology().initPartitionsWhenAffinityReady(res, this);
+                        if (grp.topology().initPartitionsWhenAffinityReady(res, this))
+                            grpToRefresh.add(grp);
                     }
                     catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Failed to initialize partitions.", e);
                     }
 
-                    if (needRefresh)
-                        cctx.exchange().refreshPartitions();
                 }
+
+                if (!grpToRefresh.isEmpty())
+                    cctx.exchange().refreshPartitions(grpToRefresh);
             }
 
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {