You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/23 15:01:12 UTC
[08/50] [abbrv] ignite git commit: IGNITE-9868 Improved background
full message sending - Fixes #4975.
IGNITE-9868 Improved background full message sending - Fixes #4975.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce73c9d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce73c9d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce73c9d8
Branch: refs/heads/ignite-gg-14206
Commit: ce73c9d85ffd5b50e4c3370b13302605392fa572
Parents: aec3f91
Author: Sergey Antonov <an...@gmail.com>
Authored: Wed Oct 17 20:08:34 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Oct 17 20:18:36 2018 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 177 ++++++++++++++-----
.../GridDhtPartitionsExchangeFuture.java | 12 +-
2 files changed, 135 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0baf5a3..6af9678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1028,11 +1028,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * Partition refresh callback.
+ * Partition refresh callback for selected cache groups.
* For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
* for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
+ *
+ * @param grps Cache groups for partitions refresh.
*/
- public void refreshPartitions() {
+ public void refreshPartitions(@NotNull Collection<CacheGroupContext> grps) {
// TODO https://issues.apache.org/jira/browse/IGNITE-6857
if (cctx.snapshot().snapshotOperationInProgress()) {
scheduleResendPartitions();
@@ -1040,6 +1042,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
}
+ if (grps.isEmpty()) {
+ if (log.isDebugEnabled())
+ log.debug("Skip partitions refresh, there are no cache groups for partition refresh.");
+
+ return;
+ }
+
ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
@@ -1049,8 +1058,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
}
- if (log.isDebugEnabled())
- log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
+ if (log.isDebugEnabled()) {
+ log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() +
+ ", cacheGroups= " + grps + ']');
+ }
// If this is the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -1068,50 +1079,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// No need to send to nodes which did not finish their first exchange.
AffinityTopologyVersion rmtTopVer =
- lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
+ lastFut != null ?
+ (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion())
+ : AffinityTopologyVersion.NONE;
Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
if (log.isDebugEnabled())
log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
- sendAllPartitions(rmts, rmtTopVer);
+ sendAllPartitions(rmts, rmtTopVer, grps);
}
else {
if (log.isDebugEnabled())
log.debug("Refreshing local partitions from non-oldest node: " +
cctx.localNodeId());
- sendLocalPartitions(oldest, null);
+ sendLocalPartitions(oldest, null, grps);
}
}
/**
+ * Partition refresh callback.
+ * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
+ * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
+ */
+ public void refreshPartitions() { refreshPartitions(cctx.cache().cacheGroups()); }
+
+ /**
* @param nodes Nodes.
* @param msgTopVer Topology version. Will be added to full message.
+ * @param grps Selected cache groups.
*/
private void sendAllPartitions(
Collection<ClusterNode> nodes,
- AffinityTopologyVersion msgTopVer
+ AffinityTopologyVersion msgTopVer,
+ Collection<CacheGroupContext> grps
) {
long time = System.currentTimeMillis();
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null, grps);
m.topologyVersion(msgTopVer);
if (log.isInfoEnabled()) {
long latency = System.currentTimeMillis() - time;
- if (latency > 100 || log.isDebugEnabled())
- log.info("Full Message creating for " + msgTopVer + " performed in " + latency + " ms.");
+ if (latency > 50 || log.isDebugEnabled()) {
+ log.info("Finished full message creation [msgTopVer=" + msgTopVer + ", groups=" + grps +
+ ", latency=" + latency + "ms]");
+ }
}
if (log.isTraceEnabled())
- log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
+ log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", cacheGroups=" + grps +
+ ", msg=" + m + ']');
time = System.currentTimeMillis();
+ Collection<ClusterNode> failedNodes = U.newHashSet(nodes.size());
+
for (ClusterNode node : nodes) {
try {
assert !node.equals(cctx.localNode());
@@ -1119,22 +1146,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" +
- node.id() + ", msg=" + m + ']');
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to send partition update to node because it left grid (will ignore) " +
+ "[node=" + node.id() + ", msg=" + m + ']');
+ }
}
catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']');
+ failedNodes.add(node);
+
+ U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']', e);
}
}
- if (log.isInfoEnabled())
- log.info("Sending Full Message for " + msgTopVer + " performed in " + (System.currentTimeMillis() - time) + " ms.");
+ if (log.isInfoEnabled()) {
+ long latency = System.currentTimeMillis() - time;
+
+ if (latency > 50 || log.isDebugEnabled()) {
+ log.info("Finished sending full message [msgTopVer=" + msgTopVer + ", groups=" + grps +
+ (failedNodes.isEmpty() ? "" : (", skipped=" + U.nodeIds(failedNodes))) +
+ ", latency=" + latency + "ms]");
+ }
+ }
}
/**
+ * Creates partitions full message for all cache groups.
+ *
* @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/
- * finishUnmarshall methods are called).
+ * finishUnmarshall methods are called).
* @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
@@ -1150,18 +1189,43 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@Nullable IgniteDhtPartitionsToReloadMap partsToReload
) {
- final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
- lastVer,
- exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE,
- partHistSuppliers,
- partsToReload
- );
+ Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
+
+ return createPartitionsFullMessage(compress, newCntrMap, exchId, lastVer, partHistSuppliers, partsToReload, grps);
+ }
+
+ /**
+ * Creates partitions full message for selected cache groups.
+ *
+ * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/
+ * finishUnmarshall methods are called).
+ * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
+ * @param exchId Non-null exchange ID if message is created for exchange.
+ * @param lastVer Last version.
+ * @param partHistSuppliers Partition history suppliers map.
+ * @param partsToReload Partitions to reload map.
+ * @param grps Selected cache groups.
+ * @return Message.
+ */
+ public GridDhtPartitionsFullMessage createPartitionsFullMessage(
+ boolean compress,
+ boolean newCntrMap,
+ @Nullable final GridDhtPartitionExchangeId exchId,
+ @Nullable GridCacheVersion lastVer,
+ @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+ @Nullable IgniteDhtPartitionsToReloadMap partsToReload,
+ Collection<CacheGroupContext> grps
+ ) {
+ AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE;
+
+ final GridDhtPartitionsFullMessage m =
+ new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload);
m.compress(compress);
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ for (CacheGroupContext grp : grps) {
if (!grp.isLocal()) {
if (exchId != null) {
AffinityTopologyVersion startTopVer = grp.localStartVersion();
@@ -1174,14 +1238,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
- if (locMap != null) {
- addFullPartitionsMap(m,
- dupData,
- compress,
- grp.groupId(),
- locMap,
- affCache.similarAffinityKey());
- }
+ if (locMap != null)
+ addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey());
m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes());
@@ -1202,14 +1260,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
GridDhtPartitionFullMap map = top.partitionMap(true);
- if (map != null) {
- addFullPartitionsMap(m,
- dupData,
- compress,
- top.groupId(),
- map,
- top.similarAffinityKey());
- }
+ if (map != null)
+ addFullPartitionsMap(m, dupData, compress, top.groupId(), map, top.similarAffinityKey());
if (exchId != null) {
CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters();
@@ -1269,13 +1321,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param node Destination cluster node.
* @param id Exchange ID.
+ * @param grps Cache groups for send partitions.
*/
- private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
- GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
- cctx.kernalContext().clientNode(),
- false,
- false,
- null);
+ private void sendLocalPartitions(
+ ClusterNode node,
+ @Nullable GridDhtPartitionExchangeId id,
+ @NotNull Collection<CacheGroupContext> grps
+ ) {
+ GridDhtPartitionsSingleMessage m =
+ createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false, false, null, grps);
if (log.isTraceEnabled())
log.trace("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
@@ -1294,6 +1348,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Creates partitions single message for all cache groups.
+ *
* @param exchangeId Exchange ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
@@ -1307,6 +1363,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean newCntrMap,
ExchangeActions exchActions
) {
+ Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
+
+ return createPartitionsSingleMessage(exchangeId, clientOnlyExchange, sndCounters, newCntrMap, exchActions, grps);
+ }
+
+ /**
+ * Creates partitions single message for selected cache groups.
+ *
+ * @param exchangeId Exchange ID.
+ * @param clientOnlyExchange Client exchange flag.
+ * @param sndCounters {@code True} if need send partition update counters.
+ * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}.
+ * @param grps Selected cache groups.
+ * @return Message.
+ */
+ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
+ @Nullable GridDhtPartitionExchangeId exchangeId,
+ boolean clientOnlyExchange,
+ boolean sndCounters,
+ boolean newCntrMap,
+ ExchangeActions exchActions,
+ Collection<CacheGroupContext> grps
+ ) {
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
clientOnlyExchange,
cctx.versions().last(),
@@ -1314,7 +1393,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new HashMap<>();
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ for (CacheGroupContext grp : grps) {
if (!grp.isLocal() && (exchActions == null || !exchActions.cacheGroupStopping(grp.groupId()))) {
GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e550a8b..0fe1a25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2080,22 +2080,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (centralizedAff || forceAffReassignment) {
assert !exchCtx.mergeExchanges();
+ Collection<CacheGroupContext> grpToRefresh = U.newHashSet(cctx.cache().cacheGroups().size());
+
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
- boolean needRefresh = false;
-
try {
- needRefresh = grp.topology().initPartitionsWhenAffinityReady(res, this);
+ if (grp.topology().initPartitionsWhenAffinityReady(res, this))
+ grpToRefresh.add(grp);
}
catch (IgniteInterruptedCheckedException e) {
U.error(log, "Failed to initialize partitions.", e);
}
- if (needRefresh)
- cctx.exchange().refreshPartitions();
}
+
+ if (!grpToRefresh.isEmpty())
+ cctx.exchange().refreshPartitions(grpToRefresh);
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {