You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/27 16:39:23 UTC
[10/15] ignite git commit: ignite-5682 Added stale version check for
GridDhtPartFullMessage not related to exchange.
ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb9d06d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb9d06d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb9d06d9
Branch: refs/heads/ignite-5578
Commit: eb9d06d9ff89388318cc3a857276c154675bc6f1
Parents: 3a3650f
Author: Dmitry Pavlov <dp...@gmail.com>
Authored: Thu Jul 27 14:51:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 27 14:51:25 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 5 +-
.../processors/cache/GridCacheIoManager.java | 2 +-
.../GridCachePartitionExchangeManager.java | 44 ++++++++++-----
.../dht/GridClientPartitionTopology.java | 12 +++-
.../dht/GridDhtPartitionTopology.java | 9 ++-
.../dht/GridDhtPartitionTopologyImpl.java | 34 +++++++-----
.../preloader/GridDhtPartitionExchangeId.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 36 ++++++++----
.../preloader/GridDhtPartitionsFullMessage.java | 4 +-
...cingDelayedPartitionMapExchangeSelfTest.java | 58 ++++++++++++++++----
.../junits/common/GridCommonAbstractTest.java | 6 +-
11 files changed, 151 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0f46a90..5a7f634 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -442,7 +442,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
grp.topology().update(topVer,
clientTop.partitionMap(true),
clientTop.updateCounters(false),
- Collections.<Integer>emptySet());
+ Collections.<Integer>emptySet(),
+ null);
}
grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
@@ -504,7 +505,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
- grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet());
+ grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null);
topFut.validate(grp, discoCache.allNodes());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 9f1873e..981c6e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -533,7 +533,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param nodeId Node ID.
+ * @param nodeId Sender Node ID.
* @param cacheMsg Cache message.
* @param c Handler closure.
* @param plc Message policy.
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d4fe93f..6a7258f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -129,7 +129,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private static final int EXCHANGE_HISTORY_SIZE =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000);
- /** Atomic reference for pending timeout object. */
+ /** Atomic reference for pending partition resend timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
/** Partition resend timeout after eviction. */
@@ -150,7 +150,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops = new ConcurrentHashMap8<>();
/** */
- private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
+ @Nullable private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
/** */
private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>();
@@ -877,6 +877,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* Partition refresh callback.
+ * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send,
+ * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send
*/
private void refreshPartitions() {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -914,7 +916,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
- sendAllPartitions(rmts);
+ sendAllPartitions(rmts, rmtTopVer);
}
else {
if (log.isDebugEnabled())
@@ -927,10 +929,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param nodes Nodes.
+ * @param msgTopVer Topology version. Will be added to full message.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes) {
+ private void sendAllPartitions(Collection<ClusterNode> nodes,
+ AffinityTopologyVersion msgTopVer) {
GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null);
+ m.topologyVersion(msgTopVer);
+
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@ -956,6 +962,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* finishUnmarshall methods are called).
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
+ * @param partHistSuppliers
+ * @param partsToReload
* @return Message.
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@@ -1064,8 +1072,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
- * @param id ID.
+ * @param node Destination cluster node.
+ * @param id Exchange ID.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
@@ -1091,7 +1099,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param targetNode Target node.
- * @param exchangeId ID.
+ * @param exchangeId Exchange ID.
* @param clientOnlyExchange Client exchange flag.
* @param sndCounters {@code True} if need send partition update counters.
* @return Message.
@@ -1297,7 +1305,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
+ * @param node Sender cluster node.
* @param msg Message.
*/
private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMessage msg) {
@@ -1323,8 +1331,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else if (!grp.isLocal())
top = grp.topology();
- if (top != null)
- updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId));
+ if (top != null) {
+ updated |= top.update(null,
+ entry.getValue(),
+ null,
+ msg.partsToReload(cctx.localNodeId(), grpId),
+ msg.topologyVersion());
+ }
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1352,7 +1365,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node ID.
+ * @param node Sender cluster node.
* @param msg Message.
*/
private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
@@ -1418,7 +1431,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node ID.
+ * @param node Sender cluster node.
* @param msg Message.
*/
private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
@@ -2250,7 +2263,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private static final long serialVersionUID = 0L;
- /** {@inheritDoc} */
+ /**
+ * @param nodeId Sender node ID.
+ * @param msg Message.
+ */
@Override public void apply(UUID nodeId, M msg) {
ClusterNode node = cctx.node(nodeId);
@@ -2268,7 +2284,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Node.
+ * @param node Sender cluster node.
* @param msg Message.
*/
protected abstract void onMessage(ClusterNode node, M msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 7343dba..4b9826e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -594,8 +594,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
Map<Integer, T2<Long, Long>> cntrMap,
- Set<Integer> partsToReload
- ) {
+ Set<Integer> partsToReload,
+ @Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
@@ -610,6 +610,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
return false;
}
+ if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Stale topology version for full partition map update message (will ignore) " +
+ "[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer + ']');
+
+ return false;
+ }
+
boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d9e04a6..81d92e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -243,16 +243,21 @@ public interface GridDhtPartitionTopology {
public void onRemoved(GridDhtCacheEntry e);
/**
- * @param exchangeVer Exchange version.
+ * @param exchangeVer Topology version from exchange. Value should be greater than previously passed. Null value
+ * means full map received is not related to exchange
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
+ * @param partsToReload
+ * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
+ * related to exchange. Value should be not less than previous 'Topology version from exchange'.
* @return {@code True} if local state was changed.
*/
public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, T2<Long, Long>> cntrMap,
- Set<Integer> partsToReload);
+ Set<Integer> partsToReload,
+ @Nullable AffinityTopologyVersion msgTopVer);
/**
* @param exchId Exchange ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 842501e..a8e13a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -440,7 +440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
removeNode(exchId.nodeId());
-
+
ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (log.isDebugEnabled()) {
@@ -1099,9 +1099,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
@Override public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
- @Nullable Map<Integer, T2<Long, Long>> cntrMap,
- Set<Integer> partsToReload
- ) {
+ @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
+ Set<Integer> partsToReload,
+ @Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
@@ -1113,12 +1113,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (stopping)
return false;
- if (cntrMap != null) {
+ if (incomeCntrMap != null) {
// update local map partition counters
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+ for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
+ T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
- if (cntr == null || cntr.get2() < e.getValue().get2())
+ if (existCntr == null || existCntr.get2() < e.getValue().get2())
this.cntrMap.put(e.getKey(), e.getValue());
}
@@ -1129,7 +1129,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part == null)
continue;
- T2<Long, Long> cntr = cntrMap.get(part.id());
+ T2<Long, Long> cntr = incomeCntrMap.get(part.id());
if (cntr != null)
part.updateCounter(cntr.get2());
@@ -1144,6 +1144,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return false;
}
+ if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) {
+ if (log.isDebugEnabled())
+ log.debug("Stale version for full partition map update message (will ignore) [lastExch=" +
+ lastExchangeVer + ", topVersion=" + msgTopVer + ']');
+
+ return false;
+ }
+
boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
@@ -1244,8 +1252,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert locPart != null;
- if (cntrMap != null) {
- T2<Long, Long> cntr = cntrMap.get(p);
+ if (incomeCntrMap != null) {
+ T2<Long, Long> cntr = incomeCntrMap.get(p);
if (cntr != null && cntr.get2() > locPart.updateCounter())
locPart.updateCounter(cntr.get2());
@@ -1271,8 +1279,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
changed = true;
}
- if (cntrMap != null) {
- T2<Long, Long> cntr = cntrMap.get(p);
+ if (incomeCntrMap != null) {
+ T2<Long, Long> cntr = incomeCntrMap.get(p);
if (cntr != null && cntr.get2() > locPart.updateCounter())
locPart.updateCounter(cntr.get2());
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 0a49415..1a4dabf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -51,7 +51,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa
@GridToStringExclude
private UUID nodeId;
- /** Event. */
+ /** Event type. */
@GridToStringExclude
private int evt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cdb4bb7..71e41b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -127,7 +127,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private final Set<UUID> remaining = new HashSet<>();
- /** */
+ /** Guarded by this */
@GridToStringExclude
private int pendingSingleUpdates;
@@ -154,7 +154,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
@GridToStringExclude
private final CountDownLatch evtLatch = new CountDownLatch(1);
- /** */
+ /** Exchange future init method completes this future. */
private GridFutureAdapter<Boolean> initFut;
/** */
@@ -196,7 +196,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Init timestamp. Used to track the amount of time spent to complete the future. */
private long initTs;
- /** */
+ /**
+ * Centralized affinity assignment required. Activated for node left of failed. For this mode crd will send full
+ * partitions maps to nodes using discovery (ring) instead of communication.
+ */
private boolean centralizedAff;
/** Change global state exception. */
@@ -613,7 +616,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
top.update(topologyVersion(),
clientTop.partitionMap(true),
clientTop.updateCounters(false),
- Collections.<Integer>emptySet());
+ Collections.<Integer>emptySet(),
+ null);
}
}
@@ -1092,7 +1096,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @param node Node.
+ * @param node Target Node.
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
@@ -1191,7 +1195,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * @param oldestNode Oldest node.
+ * @param oldestNode Oldest node. Target node to send message to.
*/
private void sendPartitions(ClusterNode oldestNode) {
try {
@@ -1368,6 +1372,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Processing of received single message. Actual processing in future may be delayed if init method was not
+ * completed, see {@link #initDone()}
+ *
* @param node Sender node.
* @param msg Single partition info.
*/
@@ -1409,11 +1416,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the
+ * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section.
+ *
* @param node Sender node.
- * @param msg Message.
+ * @param msg Partition single message.
*/
private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
- boolean allReceived = false;
+ boolean allReceived = false; // Received all expected messages.
boolean updateSingleMap = false;
synchronized (this) {
@@ -1895,7 +1905,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param node Sender node.
- * @param msg Message.
+ * @param msg Message with full partition info.
*/
private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
assert exchId.equals(msg.exchangeId()) : msg;
@@ -1953,7 +1963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
grp.topology().update(topologyVersion(),
entry.getValue(),
cntrMap,
- msg.partsToReload(cctx.localNodeId(), grpId));
+ msg.partsToReload(cctx.localNodeId(), grpId),
+ null);
}
else {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -1962,7 +1973,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().clientTopology(grpId, this).update(topologyVersion(),
entry.getValue(),
cntrMap,
- Collections.<Integer>emptySet());
+ Collections.<Integer>emptySet(),
+ null);
}
}
}
@@ -2054,7 +2066,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- *
+ * Moves exchange future to state 'init done' using {@link #initFut}.
*/
private void initDone() {
while (!isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 75609b8..acc4dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -109,7 +109,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
/**
* @param id Exchange ID.
* @param lastVer Last version.
- * @param topVer Topology version.
+ * @param topVer Topology version. For messages not related to exchange may be {@link AffinityTopologyVersion#NONE}.
+ * @param partHistSuppliers
+ * @param partsToReload
*/
public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
@Nullable GridCacheVersion lastVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index dc141db..f307b6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -18,14 +18,17 @@
package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
@@ -45,12 +48,19 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
/** */
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- /** */
+ /** Map of destination node ID to runnable with logic for real message sending.
+ * To apply real message sending use run method */
private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>();
- /** */
+ /**
+ * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel to {@link #rs} map.
+ * Applied only to messages not related to particular exchange
+ */
private volatile boolean record = false;
+ /** */
+ private AtomicBoolean replay = new AtomicBoolean();
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName);
@@ -74,13 +84,26 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
final Object msg0 = ((GridIoMessage)msg).message();
+ if (log.isDebugEnabled())
+ log.debug("Message [thread=" + Thread.currentThread().getName() + ", msg=" + msg0 + ']');
+
if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
- ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
- rs.putIfAbsent(node.id(), new Runnable() {
+ ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) {
+ if (log.isDebugEnabled())
+ log.debug("Record message [toNode=" + node.id() + ", msg=" + msg + "]");
+
+ assert !replay.get() : "Record of message is not allowed after replay";
+
+ Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() {
@Override public void run() {
+ if (log.isDebugEnabled())
+ log.debug("Replay: " + msg);
+
DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
}
});
+
+ assert prevValue == null : "Duplicate message registered to [" + node.id() + "]";
}
else
try {
@@ -94,10 +117,10 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
}
/**
- * @throws Exception e.
+ * @throws Exception e if failed.
*/
public void test() throws Exception {
- IgniteKernal ignite = (IgniteKernal)startGrid(0);
+ IgniteEx ignite = startGrid(0);
CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
@@ -144,10 +167,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
awaitPartitionMapExchange();
- for (Runnable r : rs.values())
- r.run();
-
- U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+ replayMessages();
stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case obsolete partition map accepted.
@@ -167,6 +187,22 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer2;
}
+ /**
+ * Replays all saved messages from map, actual sent is performed.
+ *
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void replayMessages() throws IgniteInterruptedCheckedException {
+ record = false;
+
+ for (Runnable r : rs.values())
+ r.run(); // Causes real messages sending.
+
+ assertTrue(replay.compareAndSet(false, true));
+
+ U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb9d06d9/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index dc7e89d..c2cf41c 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -520,7 +520,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param waitEvicts If {@code true} will wait for evictions finished.
* @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished.
- * @param nodes Optional nodes.
+ * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will
+ * be filtered
* @throws InterruptedException If interrupted.
*/
@SuppressWarnings("BusyWait")
@@ -542,7 +543,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param waitEvicts If {@code true} will wait for evictions finished.
* @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished.
- * @param nodes Optional nodes.
+ * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will
+ * be filtered
* @param printPartState If {@code true} will print partition state if evictions not happened.
* @throws InterruptedException If interrupted.
*/