You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/11 09:27:47 UTC
[48/50] ignite git commit: IGNITE-5684 - Fixed update sequence
handling in GridDhtPartitionFullMap
IGNITE-5684 - Fixed update sequence handling in GridDhtPartitionFullMap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f3b69c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f3b69c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f3b69c7
Branch: refs/heads/master
Commit: 4f3b69c7018913618e7bbc724ab83ac6c274bc1f
Parents: 2a2c803
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Tue Jul 11 11:27:57 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Jul 11 11:27:57 2017 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 71 +++++++++++++-------
.../dht/GridDhtPartitionTopologyImpl.java | 27 +++++---
.../dht/preloader/GridDhtPartitionFullMap.java | 24 +------
.../CacheLateAffinityAssignmentTest.java | 2 +-
4 files changed, 67 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e751961..f4ed517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -572,56 +572,66 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ /**
+ * Checks should current partition map overwritten by new partition map
+ * Method returns true if topology version or update sequence of new map are greater than of current map.
+ *
+ * @param currentMap Current partition map.
+ * @param newMap New partition map.
+ * @return True if current partition map should be overwritten by new partition map, false in other case.
+ */
+ private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
+ return newMap != null &&
+ (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
+ newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence());
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Override public boolean update(
- @Nullable AffinityTopologyVersion exchVer,
+ @Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
Map<Integer, T2<Long, Long>> cntrMap,
Set<Integer> partsToReload
) {
if (log.isDebugEnabled())
- log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']');
+ log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
lock.writeLock().lock();
try {
- if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) {
+ if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
- lastExchangeVer + ", exchVer=" + exchVer + ']');
-
- return false;
- }
-
- if (node2part != null && node2part.compareTo(partMap) >= 0) {
- if (log.isDebugEnabled())
- log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
- lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+ lastExchangeVer + ", exchVer=" + exchangeVer + ']');
return false;
}
- updateSeq.incrementAndGet();
-
- if (exchVer != null)
- lastExchangeVer = exchVer;
+ boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
for (GridDhtPartitionMap part : node2part.values()) {
GridDhtPartitionMap newPart = partMap.get(part.nodeId());
- // If for some nodes current partition has a newer map,
- // then we keep the newer value.
- if (newPart != null && newPart.updateSequence() < part.updateSequence()) {
- if (log.isDebugEnabled())
- log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" +
- mapString(part) + ", newPart=" + mapString(newPart) + ']');
+ if (shouldOverridePartitionMap(part, newPart)) {
+ fullMapUpdated = true;
+ if (log.isDebugEnabled())
+ log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" +
+ mapString(part) + ", newPart=" + mapString(newPart) + ']');
+ }
+ else {
+ // If for some nodes current partition has a newer map,
+ // then we keep the newer value.
partMap.put(part.nodeId(), part);
}
}
+ for (GridDhtPartitionMap part : partMap.values())
+ fullMapUpdated |= !node2part.containsKey(part);
+
+ // Remove entry if node left.
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
@@ -635,9 +645,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ if (!fullMapUpdated) {
+ if (log.isDebugEnabled())
+ log.debug("No updates for full partition map (will ignore) [lastExch=" +
+ lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+
+ return false;
+ }
+
+ if (exchangeVer != null)
+ lastExchangeVer = exchangeVer;
+
+ node2part = partMap;
+
+ updateSeq.incrementAndGet();
+
part2node.clear();
- for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
if (e0.getValue() != MOVING && e0.getValue() != OWNING)
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fe0a0c6..601da1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1134,22 +1134,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return false;
}
- if (node2part != null && node2part.compareTo(partMap) > 0) {
- if (log.isDebugEnabled())
- log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" +
- lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
-
- return false;
- }
-
- if (exchangeVer != null)
- lastExchangeVer = exchangeVer;
+ boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
for (GridDhtPartitionMap part : node2part.values()) {
GridDhtPartitionMap newPart = partMap.get(part.nodeId());
if (shouldOverridePartitionMap(part, newPart)) {
+ fullMapUpdated = true;
+
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" +
mapString(part) + ", newPart=" + mapString(newPart) + ']');
@@ -1161,6 +1154,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ for (GridDhtPartitionMap part : partMap.values())
+ fullMapUpdated |= !node2part.containsKey(part);
+
// Remove entry if node left.
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
@@ -1175,6 +1171,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ if (!fullMapUpdated) {
+ if (log.isDebugEnabled())
+ log.debug("No updates for full partition map (will ignore) [lastExch=" +
+ lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+
+ return false;
+ }
+
+ if (exchangeVer != null)
+ lastExchangeVer = exchangeVer;
+
node2part = partMap;
AffinityTopologyVersion affVer = grp.affinity().lastVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 27e6777..73b7714 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -31,8 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Full partition map.
*/
-public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap>
- implements Comparable<GridDhtPartitionFullMap>, Externalizable {
+public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -178,27 +177,6 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap>
}
/** {@inheritDoc} */
- @Override public int compareTo(GridDhtPartitionFullMap o) {
- assert nodeId == null || (nodeOrder != o.nodeOrder && !nodeId.equals(o.nodeId)) ||
- (nodeOrder == o.nodeOrder && nodeId.equals(o.nodeId)): "Inconsistent node order and ID [id1=" + nodeId +
- ", order1=" + nodeOrder + ", id2=" + o.nodeId + ", order2=" + o.nodeOrder + ']';
-
- if (nodeId == null && o.nodeId != null)
- return -1;
- else if (nodeId != null && o.nodeId == null)
- return 1;
- else if (nodeId == null && o.nodeId == null)
- return 0;
-
- int res = Long.compare(nodeOrder, o.nodeOrder);
-
- if (res == 0)
- res = Long.compare(updateSeq, o.updateSeq);
-
- return res;
- }
-
- /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeUuid(out, nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4f3b69c7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 6174209..23043d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -181,7 +181,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
MemoryConfiguration cfg1 = new MemoryConfiguration();
- cfg1.setDefaultMemoryPolicySize(50 * 1024 * 1024L);
+ cfg1.setDefaultMemoryPolicySize(100 * 1024 * 1024L);
cfg.setMemoryConfiguration(cfg1);