You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/29 14:38:26 UTC
[12/16] ignite git commit: Fixed update sequence.
Fixed update sequence.
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78b7e3f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78b7e3f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78b7e3f0
Branch: refs/heads/master
Commit: 78b7e3f04028bf2470ac74ae8649f9e7f9aa692c
Parents: 7ce00b1
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Sat Aug 26 17:21:44 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Aug 29 17:32:04 2017 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionTopologyImpl.java | 33 ++++++++++++++------
1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/78b7e3f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a881130..e0f54b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -521,7 +521,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
DiscoveryEvent evt = evts0.get(i);
if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
- removeNode(evt.eventNode().id());
+ updateSeq = removeNode(evt.eventNode().id(), updateSeq);
}
}
@@ -1379,6 +1379,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
+ node2part.newUpdateSequence(updateSeq);
+
if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer);
@@ -1535,8 +1537,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionMap cur = node2part.get(parts.nodeId());
if (force) {
- if (cur != null && cur.topologyVersion().initialized())
+ if (cur != null && cur.topologyVersion().initialized()) {
parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
+
+ this.updateSeq.setIfGreater(cur.updateSequence());
+ }
}
else if (isStaleUpdate(cur, parts)) {
U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId +
@@ -1546,10 +1551,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return false;
}
- long updateSeq = this.updateSeq.incrementAndGet();
-
- node2part.newUpdateSequence(updateSeq);
-
boolean changed = false;
if (cur == null || !cur.equals(parts))
@@ -1557,6 +1558,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
node2part.put(parts.nodeId(), parts);
+ this.updateSeq.setIfGreater(parts.updateSequence());
+
+ long updateSeq = this.updateSeq.incrementAndGet();
+
// During exchange diff is calculated after all messages are received and affinity initialized.
if (exchId == null && !grp.isReplicated()) {
if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
@@ -2110,8 +2115,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/**
* @param nodeId Node to remove.
+ * @param updateSeq Update sequence.
+ *
+ * @return Update sequence.
*/
- private void removeNode(UUID nodeId) {
+ private long removeNode(UUID nodeId, long updateSeq) {
assert nodeId != null;
assert lock.isWriteLockedByCurrentThread();
@@ -2122,11 +2130,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ClusterNode loc = ctx.localNode();
if (node2part != null) {
- updateSeq.setIfGreater(node2part.updateSequence());
+ assert updateSeq >= node2part.updateSequence();
- if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id()))
- node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),
+ if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
+ updateSeq++;
+
+ node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq,
node2part, false);
+ }
else
node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
@@ -2145,6 +2156,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
consistencyCheck();
}
+
+ return updateSeq;
}
/** {@inheritDoc} */