You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/15 14:14:43 UTC
ignite git commit: ignite-10043
Repository: ignite
Updated Branches:
refs/heads/ignite-10043 [created] 5f6141927
ignite-10043
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f614192
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f614192
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f614192
Branch: refs/heads/ignite-10043
Commit: 5f61419270ca4813e8d0edbb4c23341591fa1816
Parents: 3c54f38
Author: sboikov <sb...@apache.org>
Authored: Thu Nov 15 17:14:21 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Thu Nov 15 17:14:21 2018 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 5 +-
.../dht/topology/GridDhtPartitionTopology.java | 5 +-
.../topology/GridDhtPartitionTopologyImpl.java | 115 ++++++++-----------
.../IgniteCachePartitionLossPolicySelfTest.java | 6 +-
4 files changed, 57 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f614192/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3702a51..2499c8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3008,7 +3008,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal()) {
- boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, events().lastEvent());
+ // Do not trigger lost partition events on start.
+ boolean event = !localJoinExchange() && !activateCluster();
+
+ boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, event ? events().lastEvent() : null);
detected |= detectedOnGrp;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f614192/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 25b284e..bdf16da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -327,10 +328,10 @@ public interface GridDhtPartitionTopology {
* This method should be called on topology coordinator after all partition messages are received.
*
* @param resTopVer Exchange result version.
- * @param discoEvt Discovery event for which we detect lost partitions.
+ * @param discoEvt Discovery event for which we detect lost partitions if {@link EventType#EVT_CACHE_REBALANCE_PART_DATA_LOST} event should be fired.
* @return {@code True} if partitions state got updated.
*/
- public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt);
+ public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, @Nullable DiscoveryEvent discoEvt);
/**
* Resets the state of all LOST partitions to OWNING.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f614192/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 45f3282..51e6bec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -110,8 +111,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Node to partition map. */
private GridDhtPartitionFullMap node2part;
- /** Partitions map for left nodes. */
- private GridDhtPartitionFullMap leftNode2Part = new GridDhtPartitionFullMap();
+ /** */
+ private Set<Integer> lostParts;
/** */
private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
@@ -1467,13 +1468,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.trace("Removing left node from full map update [grp=" + grp.cacheOrGroupName() +
", nodeId=" + nodeId + ", partMap=" + partMap + ']');
- if (node2part.containsKey(nodeId)) {
- GridDhtPartitionMap map = partMap.get(nodeId);
-
- if (map != null)
- leftNode2Part.put(nodeId, map);
- }
-
it.remove();
}
}
@@ -2003,7 +1997,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) {
+ @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, @Nullable DiscoveryEvent discoEvt) {
ctx.database().checkpointReadLock();
try {
@@ -2013,48 +2007,55 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (node2part == null)
return false;
+ PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
+
+ assert plc != null;
+
int parts = grp.affinity().partitions();
- Set<Integer> lost = new HashSet<>(parts);
+ Set<Integer> recentlyLost = null;
- for (int p = 0; p < parts; p++)
- lost.add(p);
+ boolean changed = false;
- for (GridDhtPartitionMap partMap : node2part.values()) {
- for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) {
- if (e.getValue() == OWNING) {
- lost.remove(e.getKey());
+ for (int part = 0; part < parts; part++) {
+ boolean lost = F.contains(lostParts, part);
- if (lost.isEmpty())
+ if (!lost) {
+ boolean hasOwner = false;
+
+ for (GridDhtPartitionMap partMap : node2part.values()) {
+ if (partMap.get(part) == OWNING) {
+ hasOwner = true;
break;
+ }
}
- }
- }
- boolean changed = false;
+ if (!hasOwner) {
+ lost = true;
- if (!F.isEmpty(lost)) {
- PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
+ if (lostParts == null)
+ lostParts = new TreeSet<>();
- assert plc != null;
+ lostParts.add(part);
- Set<Integer> recentlyLost = new HashSet<>();
+ if (discoEvt != null) {
+ if (recentlyLost == null)
+ recentlyLost = new HashSet<>();
- for (Map.Entry<UUID, GridDhtPartitionMap> leftEntry : leftNode2Part.entrySet()) {
- for (Map.Entry<Integer, GridDhtPartitionState> entry : leftEntry.getValue().entrySet()) {
- if (entry.getValue() == OWNING)
- recentlyLost.add(entry.getKey());
- }
- }
+ recentlyLost.add(part);
- if (!recentlyLost.isEmpty()) {
- U.warn(log, "Detected lost partitions [grp=" + grp.cacheOrGroupName()
- + ", parts=" + S.compact(recentlyLost)
- + ", plc=" + plc + "]");
+ if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+ grp.addRebalanceEvent(part,
+ EVT_CACHE_REBALANCE_PART_DATA_LOST,
+ discoEvt.eventNode(),
+ discoEvt.type(),
+ discoEvt.timestamp());
+ }
+ }
+ }
}
- // Update partition state on all nodes.
- for (Integer part : lost) {
+ if (lost) {
long updSeq = updateSeq.incrementAndGet();
GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true);
@@ -2080,21 +2081,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
e.getValue().put(part, LOST);
}
}
-
- if (recentlyLost.contains(part) && grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- grp.addRebalanceEvent(part,
- EVT_CACHE_REBALANCE_PART_DATA_LOST,
- discoEvt.eventNode(),
- discoEvt.type(),
- discoEvt.timestamp());
- }
}
+ }
- if (plc != PartitionLossPolicy.IGNORE)
- grp.needsRecovery(true);
+ if (recentlyLost != null) {
+ U.warn(log, "Detected lost partitions [grp=" + grp.cacheOrGroupName()
+ + ", parts=" + S.compact(recentlyLost)
+ + ", plc=" + plc + "]");
}
- leftNode2Part.clear();
+ if (lostParts != null && plc != PartitionLossPolicy.IGNORE)
+ grp.needsRecovery(true);
return changed;
}
@@ -2144,6 +2141,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer));
+ lostParts = null;
+
grp.needsRecovery(false);
}
finally {
@@ -2163,22 +2162,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.readLock().lock();
try {
- Set<Integer> res = null;
-
- int parts = grp.affinity().partitions();
-
- for (GridDhtPartitionMap partMap : node2part.values()) {
- for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) {
- if (e.getValue() == LOST) {
- if (res == null)
- res = new HashSet<>(parts);
-
- res.add(e.getKey());
- }
- }
- }
-
- return res == null ? Collections.<Integer>emptySet() : res;
+ return lostParts == null ? Collections.<Integer>emptySet() : new HashSet<>(lostParts);
}
finally {
lock.readLock().unlock();
@@ -2533,9 +2517,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionMap parts = node2part.remove(nodeId);
- if (parts != null)
- leftNode2Part.put(nodeId, parts);
-
if (!grp.isReplicated()) {
if (parts != null) {
for (Integer p : parts.keySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f614192/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index caf0829..a016865 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -295,8 +295,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
* @throws Exception if failed.
*/
public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-10043");
-
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
backups = 1;
@@ -886,12 +884,12 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
for (Map<Integer, Semaphore> map : lostMap) {
for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ assertTrue("Failed to wait for partition LOST event for partition: " + entry.getKey(), entry.getValue().tryAcquire(1));
}
for (Map<Integer, Semaphore> map : lostMap) {
for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ assertFalse("Partition LOST event raised twice for partition: " + entry.getKey(), entry.getValue().tryAcquire(1));
}
return parts;