You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/03 07:22:27 UTC
[49/50] [abbrv] ignite git commit: Optimizations from ignite-5068.
Optimizations from ignite-5068.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58b6e05e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58b6e05e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58b6e05e
Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 58b6e05e82978c68ba9e2e53a3b9b866e2c474ca
Parents: 52556f4
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Fri Apr 28 19:57:32 2017 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Fri Apr 28 19:57:32 2017 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 5 +
.../dht/GridDhtPartitionTopology.java | 6 +
.../dht/GridDhtPartitionTopologyImpl.java | 473 +++++++++++--------
.../GridDhtPartitionsExchangeFuture.java | 9 +
4 files changed, 302 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5c5a3c4..e71c18f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -778,6 +778,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public void onExchangeDone(AffinityAssignment assignment) {
+ // no-op
+ }
+
+ /** {@inheritDoc} */
@Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
assert false : "detectLostPartitions should never be called on client topology";
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 2bef267..5df8714 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -326,4 +326,10 @@ public interface GridDhtPartitionTopology {
* @return Set of node IDs that should reload partitions.
*/
public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq);
+
+ /**
+ * Callback on exchange done.
+ * @param assignment New affinity assignment.
+ */
+ public void onExchangeDone(AffinityAssignment assignment);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fb09b38..d1283c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -79,6 +79,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
private static final boolean FULL_MAP_DEBUG = false;
/** */
+ private static final boolean FAST_DIFF_REBUILD = true;
+
+ /** */
private static final Long ZERO = 0L;
/** Context. */
@@ -93,8 +96,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/** Node to partition map. */
private GridDhtPartitionFullMap node2part;
- /** Partition to node map. */
- private final Map<Integer, Set<UUID>> part2node;
+ /** */
+ private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
+
+ /** */
+ private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
/** */
private GridDhtPartitionExchangeId lastExchangeId;
@@ -142,8 +148,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
log = cctx.logger(getClass());
locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
-
- part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f);
}
/** {@inheritDoc} */
@@ -160,7 +164,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
try {
node2part = null;
- part2node.clear();
+ diffFromAffinity.clear();
lastExchangeId = null;
@@ -168,6 +172,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
topReadyFut = null;
+ diffFromAffinityVer = AffinityTopologyVersion.NONE;
+
rebalancedTopVer = AffinityTopologyVersion.NONE;
topVer = AffinityTopologyVersion.NONE;
@@ -863,18 +869,42 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
List<ClusterNode> nodes = null;
- Collection<UUID> nodeIds = part2node.get(p);
+ if (!topVer.equals(diffFromAffinityVer)) {
+ log.error("??? node2part [topVer=" + topVer + ", diffVer=" + diffFromAffinityVer + "]");
+
+ nodes = new ArrayList<>();
+
+ nodes.addAll(affNodes);
- if (!F.isEmpty(nodeIds)) {
- for (UUID nodeId : nodeIds) {
- HashSet<UUID> affIds = affAssignment.getIds(p);
+ for (Map.Entry<UUID, GridDhtPartitionMap2> entry : node2part.entrySet()) {
+ GridDhtPartitionState state = entry.getValue().get(p);
- if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) {
+ ClusterNode n = cctx.discovery().node(entry.getKey());
+
+ if (n != null && state != null && (state == MOVING || state == OWNING) && !nodes.contains(n)
+ && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
+ nodes.add(n);
+ }
+
+ }
+
+ return nodes;
+ }
+
+ Collection<UUID> diffIds = diffFromAffinity.get(p);
+
+ if (!F.isEmpty(diffIds)) {
+ HashSet<UUID> affIds = affAssignment.getIds(p);
+
+ for (UUID nodeId : diffIds) {
+ assert !affIds.contains(nodeId);
+
+ if (hasState(p, nodeId, OWNING, MOVING)) {
ClusterNode n = cctx.discovery().node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null) {
- nodes = new ArrayList<>(affNodes.size() + 2);
+ nodes = new ArrayList<>(affNodes.size() + diffIds.size());
nodes.addAll(affNodes);
}
@@ -903,7 +933,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states) {
- Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null;
+ Collection<UUID> allIds = F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId()));
lock.readLock().lock();
@@ -913,20 +943,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
", node2part=" + node2part +
", cache=" + cctx.name() + ']';
- Collection<UUID> nodeIds = part2node.get(p);
-
// Node IDs can be null if both, primary and backup, nodes disappear.
- int size = nodeIds == null ? 0 : nodeIds.size();
-
- if (size == 0)
- return Collections.emptyList();
-
- List<ClusterNode> nodes = new ArrayList<>(size);
-
- for (UUID id : nodeIds) {
- if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
- continue;
+ List<ClusterNode> nodes = new ArrayList<>();
+ for (UUID id : allIds) {
if (hasState(p, id, state, states)) {
ClusterNode n = cctx.discovery().node(id);
@@ -1098,30 +1118,39 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
node2part = partMap;
- part2node.clear();
+ AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+ if (diffFromAffinityVer.compareTo(affVer) <= 0) {
+ AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
- for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
- if (e0.getValue() != MOVING && e0.getValue() != OWNING)
- continue;
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
+ for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+ int p = e0.getKey();
- int p = e0.getKey();
+ Set<UUID> diffIds = diffFromAffinity.get(p);
- Set<UUID> ids = part2node.get(p);
+ if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
+ !affAssignment.getIds(p).contains(e.getKey())) {
- if (ids == null)
- // Initialize HashSet to size 3 in anticipation that there won't be
- // more than 3 nodes per partitions.
- part2node.put(p, ids = U.newHashSet(3));
+ if (diffIds == null)
+ diffFromAffinity.put(p, diffIds = U.newHashSet(3));
- ids.add(e.getKey());
+ diffIds.add(e.getKey());
+ }
+ else {
+ if (diffIds != null && diffIds.remove(e.getKey())) {
+ if (diffIds.isEmpty())
+ diffFromAffinity.remove(p);
+ }
+ }
+ }
}
+
+ diffFromAffinityVer = affVer;
}
boolean changed = false;
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
-
GridDhtPartitionMap2 nodeMap = partMap.get(cctx.localNodeId());
if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
@@ -1296,37 +1325,52 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
node2part.put(parts.nodeId(), parts);
- // Add new mappings.
- for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) {
- int p = e.getKey();
+ AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
- Set<UUID> ids = part2node.get(p);
+ if (affVer.compareTo(diffFromAffinityVer) >= 0) {
+ AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
- if (e.getValue() == MOVING || e.getValue() == OWNING) {
- if (ids == null)
- // Initialize HashSet to size 3 in anticipation that there won't be
- // more than 3 nodes per partition.
- part2node.put(p, ids = U.newHashSet(3));
+ // Add new mappings.
+ for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
+ int p = e.getKey();
- changed |= ids.add(parts.nodeId());
- }
- else {
- if (ids != null)
- changed |= ids.remove(parts.nodeId());
+ Set<UUID> diffIds = diffFromAffinity.get(p);
+
+ if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
+ && !affAssignment.getIds(p).contains(parts.nodeId())) {
+ if (diffIds == null)
+ diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+
+ if (diffIds.add(parts.nodeId()))
+ changed = true;
+ }
+ else {
+ if (diffIds != null && diffIds.remove(parts.nodeId())) {
+ changed = true;
+
+ if (diffIds.isEmpty())
+ diffFromAffinity.remove(p);
+ }
+
+ }
}
- }
- // Remove obsolete mappings.
- if (cur != null) {
- for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
- Set<UUID> ids = part2node.get(p);
+ // Remove obsolete mappings.
+ if (cur != null) {
+ for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+ Set<UUID> ids = diffFromAffinity.get(p);
- if (ids != null)
- changed |= ids.remove(parts.nodeId());
+ if (ids != null && ids.remove(parts.nodeId())) {
+ changed = true;
+
+ if (ids.isEmpty())
+ diffFromAffinity.remove(p);
+ }
+ }
}
- }
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ diffFromAffinityVer = affVer;
+ }
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
@@ -1352,44 +1396,96 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
}
/** {@inheritDoc} */
- @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+ @Override public void onExchangeDone(AffinityAssignment assignment) {
lock.writeLock().lock();
try {
- int parts = cctx.affinity().partitions();
+ if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+ rebuildDiff(assignment);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param affAssignment New affinity assignment.
+ */
+ private void rebuildDiff(AffinityAssignment affAssignment) {
+ assert lock.isWriteLockedByCurrentThread();
- Collection<Integer> lost = null;
+ if (node2part == null)
+ return;
- for (int p = 0; p < parts; p++) {
- boolean foundOwner = false;
+ if (FAST_DIFF_REBUILD) {
+ Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.name(), affAssignment.topologyVersion()));
- Set<UUID> nodeIds = part2node.get(p);
+ for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) {
+ int p = e.getKey();
+
+ Iterator<UUID> iter = e.getValue().iterator();
+
+ while (iter.hasNext()) {
+ UUID nodeId = iter.next();
- if (nodeIds != null) {
- for (UUID nodeId : nodeIds) {
- GridDhtPartitionMap2 partMap = node2part.get(nodeId);
+ if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
+ iter.remove();
+ }
+ }
+ }
+ else {
+ for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
+ UUID nodeId = e.getKey();
- GridDhtPartitionState state = partMap.get(p);
+ for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+ int p0 = e0.getKey();
- if (state == OWNING) {
- foundOwner = true;
+ GridDhtPartitionState state = e0.getValue();
- break;
- }
+ Set<UUID> ids = diffFromAffinity.get(p0);
+
+ if ((state == MOVING || state == OWNING) && !affAssignment.getIds(p0).contains(nodeId)) {
+ if (ids == null)
+ diffFromAffinity.put(p0, ids = U.newHashSet(3));
+
+ ids.add(nodeId);
+ }
+ else {
+ if (ids != null)
+ ids.remove(nodeId);
}
}
+ }
+ }
+
+ diffFromAffinityVer = affAssignment.topologyVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+ lock.writeLock().lock();
+
+ try {
+ if (node2part == null)
+ return false;
+
+ int parts = cctx.affinity().partitions();
- if (!foundOwner) {
- if (lost == null)
- lost = new HashSet<>(parts - p, 1.0f);
+ Set<Integer> lost = new HashSet<>(parts);
- lost.add(p);
+ for (int p = 0; p < parts; p++)
+ lost.add(p);
+
+ for (GridDhtPartitionMap2 partMap : node2part.values()) {
+ for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) {
+ if (e.getValue() == OWNING)
+ lost.remove(e.getKey());
}
}
boolean changed = false;
- if (lost != null) {
+ if (!lost.isEmpty()) {
PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
assert plc != null;
@@ -1410,16 +1506,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
}
// Update map for remote node.
else if (plc != PartitionLossPolicy.IGNORE) {
- Set<UUID> nodeIds = part2node.get(part);
-
- if (nodeIds != null) {
- for (UUID nodeId : nodeIds) {
- GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
-
- if (nodeMap.get(part) != EVICTED)
- nodeMap.put(part, LOST);
- }
- }
+ // TODO
+// Set<UUID> nodeIds = part2node.get(part);
+//
+// if (nodeIds != null) {
+// for (UUID nodeId : nodeIds) {
+// GridDhtPartitionMap nodeMap = node2part.get(nodeId);
+//
+// if (nodeMap.get(part) != EVICTED)
+// nodeMap.put(part, LOST);
+// }
+// }
}
if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
@@ -1440,86 +1537,83 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/** {@inheritDoc} */
@Override public void resetLostPartitions() {
- lock.writeLock().lock();
-
- try {
- int parts = cctx.affinity().partitions();
- long updSeq = updateSeq.incrementAndGet();
-
- for (int part = 0; part < parts; part++) {
- Set<UUID> nodeIds = part2node.get(part);
-
- if (nodeIds != null) {
- boolean lost = false;
-
- for (UUID node : nodeIds) {
- GridDhtPartitionMap2 map = node2part.get(node);
-
- if (map.get(part) == LOST) {
- lost = true;
-
- break;
- }
- }
-
- if (lost) {
- GridDhtLocalPartition locPart = localPartition(part, topVer, false);
-
- if (locPart != null) {
- boolean marked = locPart.own();
-
- if (marked)
- updateLocal(locPart.id(), locPart.state(), updSeq);
- }
-
- for (UUID nodeId : nodeIds) {
- GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
-
- if (nodeMap.get(part) == LOST)
- nodeMap.put(part, OWNING);
- }
- }
- }
- }
-
- checkEvictions(updSeq, cctx.affinity().assignments(topVer));
-
- cctx.needsRecovery(false);
- }
- finally {
- lock.writeLock().unlock();
- }
+ // TODO
+
+// lock.writeLock().lock();
+//
+// try {
+// int parts = cctx.affinity().partitions();
+// long updSeq = updateSeq.incrementAndGet();
+//
+// for (int part = 0; part < parts; part++) {
+// Set<UUID> nodeIds = part2node.get(part);
+//
+// if (nodeIds != null) {
+// boolean lost = false;
+//
+// for (UUID node : nodeIds) {
+// GridDhtPartitionMap2 map = node2part.get(node);
+//
+// if (map.get(part) == LOST) {
+// lost = true;
+//
+// break;
+// }
+// }
+//
+// if (lost) {
+// GridDhtLocalPartition locPart = localPartition(part, topVer, false);
+//
+// if (locPart != null) {
+// boolean marked = locPart.own();
+//
+// if (marked)
+// updateLocal(locPart.id(), locPart.state(), updSeq);
+// }
+//
+// for (UUID nodeId : nodeIds) {
+// GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
+//
+// if (nodeMap.get(part) == LOST)
+// nodeMap.put(part, OWNING);
+// }
+// }
+// }
+// }
+//
+// checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+//
+// cctx.needsRecovery(false);
+// }
+// finally {
+// lock.writeLock().unlock();
+// }
}
/** {@inheritDoc} */
@Override public Collection<Integer> lostPartitions() {
+ if (cctx.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE)
+ return Collections.emptySet();
+
lock.readLock().lock();
try {
- Collection<Integer> res = null;
+ Set<Integer> res = null;
int parts = cctx.affinity().partitions();
- for (int part = 0; part < parts; part++) {
- Set<UUID> nodeIds = part2node.get(part);
-
- if (nodeIds != null) {
- for (UUID node : nodeIds) {
- GridDhtPartitionMap2 map = node2part.get(node);
-
- if (map.get(part) == LOST) {
- if (res == null)
- res = new ArrayList<>(parts - part);
+ for (GridDhtPartitionMap2 partMap : node2part.values()) {
+ for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) {
+ if (e.getValue() == LOST) {
+ if (res == null)
+ res = new HashSet<>(parts);
- res.add(part);
-
- break;
- }
+ res.add(e.getKey());
}
}
}
- return res == null ? Collections.<Integer>emptyList() : res;
+ return res == null ? Collections.<Integer>emptySet() : res;
}
finally {
lock.readLock().unlock();
@@ -1750,12 +1844,18 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
map.put(p, state);
- Set<UUID> ids = part2node.get(p);
+ if (state == MOVING || state == OWNING) {
+ AffinityAssignment assignment = cctx.affinity().assignment(diffFromAffinityVer);
- if (ids == null)
- part2node.put(p, ids = U.newHashSet(3));
+ if (!assignment.getIds(p).contains(cctx.localNodeId())) {
+ Set<UUID> diffIds = diffFromAffinity.get(p);
- ids.add(locNodeId);
+ if (diffIds == null)
+ diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+
+ diffIds.add(cctx.localNodeId());
+ }
+ }
}
return updateSeq;
@@ -1787,14 +1887,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
if (parts != null) {
for (Integer p : parts.keySet()) {
- Set<UUID> nodeIds = part2node.get(p);
+ Set<UUID> diffIds = diffFromAffinity.get(p);
- if (nodeIds != null) {
- nodeIds.remove(nodeId);
-
- if (nodeIds.isEmpty())
- part2node.remove(p);
- }
+ if (diffIds != null)
+ diffIds.remove(nodeId);
}
}
@@ -1986,7 +2082,25 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
if (affNodes.isEmpty())
continue;
- List<ClusterNode> owners = owners(i);
+ Set<ClusterNode> owners = U.newHashSet(affNodes.size());
+
+ for (ClusterNode node : affNodes) {
+ if (hasState(i, node.id(), OWNING))
+ owners.add(node);
+ }
+
+ Set<UUID> diff = diffFromAffinity.get(i);
+
+ if (diff != null) {
+ for (UUID nodeId : diff) {
+ if (hasState(i, nodeId, OWNING)) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node != null)
+ owners.add(node);
+ }
+ }
+ }
if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
return;
@@ -2035,30 +2149,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
* Checks consistency after all operations.
*/
private void consistencyCheck() {
- if (CONSISTENCY_CHECK) {
- if (node2part == null)
- return;
-
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
- for (Integer p : e.getValue().keySet()) {
- Set<UUID> nodeIds = part2node.get(p);
-
- assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']';
- assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" +
- e.getKey() + ", nodeIds=" + nodeIds + ']';
- }
- }
-
- for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
- for (UUID nodeId : e.getValue()) {
- GridDhtPartitionMap2 map = node2part.get(nodeId);
-
- assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']';
- assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() +
- ", nodeId=" + nodeId + ']';
- }
- }
- }
+ // no-op
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 078e67b5..28c3956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1342,6 +1342,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.database().releaseHistoryForExchange();
+ if (err == null && realExchange) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ cacheCtx.topology().onExchangeDone(cacheCtx.affinity().assignment(topologyVersion()));
+ }
+ }
+
if (super.onDone(res, err) && realExchange) {
exchLog.info("exchange finished [topVer=" + topologyVersion() +
", time1=" + duration() +