You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/20 14:55:30 UTC
[47/50] [abbrv] ignite git commit: ignite-10044
ignite-10044
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93b9967c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93b9967c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93b9967c
Branch: refs/heads/ignite-10044
Commit: 93b9967c410fa25c86024f1fe56f74443eaca27d
Parents: c634be2
Author: sboikov <sb...@apache.org>
Authored: Tue Nov 20 17:36:18 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Tue Nov 20 17:36:18 2018 +0300
----------------------------------------------------------------------
.../GridDhtPartitionsExchangeFuture.java | 39 +++++++++---
.../topology/GridClientPartitionTopology.java | 66 ++++++++++++++++++--
2 files changed, 91 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93b9967c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 72f01f0..3ce5cf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1806,7 +1806,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
- resetLostPartitions(caches);
+ resetLostPartitions(caches, false);
}
if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) {
@@ -2078,7 +2078,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (serverNodeDiscoveryEvent() || localJoinExchange())
- detectLostPartitions(res);
+ detectLostPartitions(res, false);
Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
@@ -2997,7 +2997,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*
* @param resTopVer Result topology version.
*/
- private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
+ private void detectLostPartitions(AffinityTopologyVersion resTopVer, boolean crd) {
boolean detected = false;
long time = System.currentTimeMillis();
@@ -3016,6 +3016,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
detected |= detectedOnGrp;
}
}
+
+ if (crd) {
+ for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+ top.detectLostPartitions(resTopVer, null);
+ }
}
if (detected) {
@@ -3033,24 +3038,38 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param cacheNames Cache names.
*/
- private void resetLostPartitions(Collection<String> cacheNames) {
+ private void resetLostPartitions(Collection<String> cacheNames, boolean crd) {
assert !exchCtx.mergeExchanges();
synchronized (cctx.exchange().interruptLock()) {
if (Thread.currentThread().isInterrupted())
return;
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
+ for (String cacheName : cacheNames) {
+ boolean found = false;
+
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
- for (String cacheName : cacheNames) {
if (grp.hasCache(cacheName)) {
grp.topology().resetLostPartitions(initialVersion());
+ found = true;
+
break;
}
}
+
+ if (crd && !found) {
+ DynamicCacheDescriptor cacheDesc = cctx.affinity().caches().get(CU.cacheId(cacheName));
+
+ if (cacheDesc != null) {
+ GridDhtPartitionTopology top = cctx.exchange().clientTopology(cacheDesc.groupId(), context().events().discoveryCache());
+
+ top.resetLostPartitions(initialVersion());
+ }
+ }
}
}
}
@@ -3276,7 +3295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
- resetLostPartitions(caches);
+ resetLostPartitions(caches, true);
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
@@ -3288,7 +3307,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assignPartitionsStates();
if (exchCtx.events().hasServerLeft())
- detectLostPartitions(resTopVer);
+ detectLostPartitions(resTopVer, true);
}
// Recalculate new affinity based on partitions availability.
http://git-wip-us.apache.org/repos/asf/ignite/blob/93b9967c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 01db508..1a99e59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -26,13 +26,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -59,7 +62,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
@@ -96,7 +101,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private AffinityTopologyVersion lastExchangeVer;
/** */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+ private AffinityTopologyVersion topVer;
/** */
private volatile boolean stopping;
@@ -125,6 +130,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */
private volatile Map<Integer, Long> globalPartSizes;
+ /** */
+ private TreeSet<Integer> lostParts;
+
/**
* @param cctx Context.
* @param discoCache Discovery data cache.
@@ -998,14 +1006,53 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
- assert false : "detectLostPartitions should never be called on client topology";
+ lock.writeLock().lock();
- return false;
+
+ boolean changed = false;
+
+ try {
+ for (int part = 0; part < parts; part++) {
+ boolean lost = F.contains(lostParts, part);
+
+ if (!lost) {
+ boolean hasOwner = false;
+
+ for (GridDhtPartitionMap partMap : node2part.values()) {
+ if (partMap.get(part) == OWNING) {
+ hasOwner = true;
+ break;
+ }
+ }
+
+ if (!hasOwner) {
+ if (lostParts == null)
+ lostParts = new TreeSet<>();
+
+ changed = true;
+
+ lostParts.add(part);
+ }
+ }
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return changed;
}
/** {@inheritDoc} */
@Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
- assert false : "resetLostPartitions should never be called on client topology";
+ lock.writeLock().lock();
+
+ try {
+ lostParts = null;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
@@ -1208,6 +1255,17 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ if (lostParts != null) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+ for (Integer part : lostParts) {
+ GridDhtPartitionState state = e.getValue().get(part);
+
+ if (state != null && state.active())
+ e.getValue().put(part, LOST);
+ }
+ }
+ }
+
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet())
part2node.put(entry.getKey(), entry.getValue());