You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/20 14:55:30 UTC

[47/50] [abbrv] ignite git commit: ignite-10044

ignite-10044


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93b9967c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93b9967c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93b9967c

Branch: refs/heads/ignite-10044
Commit: 93b9967c410fa25c86024f1fe56f74443eaca27d
Parents: c634be2
Author: sboikov <sb...@apache.org>
Authored: Tue Nov 20 17:36:18 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Tue Nov 20 17:36:18 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 39 +++++++++---
 .../topology/GridClientPartitionTopology.java   | 66 ++++++++++++++++++--
 2 files changed, 91 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93b9967c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 72f01f0..3ce5cf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1806,7 +1806,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             Set<String> caches = exchActions.cachesToResetLostPartitions();
 
             if (!F.isEmpty(caches))
-                resetLostPartitions(caches);
+                resetLostPartitions(caches, false);
         }
 
         if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) {
@@ -2078,7 +2078,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
 
             if (serverNodeDiscoveryEvent() || localJoinExchange())
-                detectLostPartitions(res);
+                detectLostPartitions(res, false);
 
             Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
@@ -2997,7 +2997,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      *
      * @param resTopVer Result topology version.
      */
-    private void detectLostPartitions(AffinityTopologyVersion resTopVer) {
+    private void detectLostPartitions(AffinityTopologyVersion resTopVer, boolean crd) {
         boolean detected = false;
 
         long time = System.currentTimeMillis();
@@ -3016,6 +3016,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     detected |= detectedOnGrp;
                 }
             }
+
+            if (crd) {
+                for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+                    top.detectLostPartitions(resTopVer, null);
+            }
         }
 
         if (detected) {
@@ -3033,24 +3038,38 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @param cacheNames Cache names.
      */
-    private void resetLostPartitions(Collection<String> cacheNames) {
+    private void resetLostPartitions(Collection<String> cacheNames, boolean crd) {
         assert !exchCtx.mergeExchanges();
 
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
+            for (String cacheName : cacheNames) {
+                boolean found = false;
+
+                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
+                        continue;
 
-                for (String cacheName : cacheNames) {
                     if (grp.hasCache(cacheName)) {
                         grp.topology().resetLostPartitions(initialVersion());
 
+                        found = true;
+
                         break;
                     }
                 }
+
+                if (crd && !found) {
+                    DynamicCacheDescriptor cacheDesc = cctx.affinity().caches().get(CU.cacheId(cacheName));
+
+                    if (cacheDesc != null) {
+                        GridDhtPartitionTopology top = cctx.exchange().clientTopology(cacheDesc.groupId(), context().events().discoveryCache());
+
+                        top.resetLostPartitions(initialVersion());
+                    }
+                }
             }
         }
     }
@@ -3276,7 +3295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         Set<String> caches = exchActions.cachesToResetLostPartitions();
 
                         if (!F.isEmpty(caches))
-                            resetLostPartitions(caches);
+                            resetLostPartitions(caches, true);
                     }
                 }
                 else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
@@ -3288,7 +3307,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     assignPartitionsStates();
 
                 if (exchCtx.events().hasServerLeft())
-                    detectLostPartitions(resTopVer);
+                    detectLostPartitions(resTopVer, true);
             }
 
             // Recalculate new affinity based on partitions availability.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93b9967c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 01db508..1a99e59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -26,13 +26,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -59,7 +62,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
@@ -96,7 +101,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private AffinityTopologyVersion lastExchangeVer;
 
     /** */
-    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+    private AffinityTopologyVersion topVer;
 
     /** */
     private volatile boolean stopping;
@@ -125,6 +130,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** */
     private volatile Map<Integer, Long> globalPartSizes;
 
+    /** */
+    private TreeSet<Integer> lostParts;
+
     /**
      * @param cctx Context.
      * @param discoCache Discovery data cache.
@@ -998,14 +1006,53 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
-        assert false : "detectLostPartitions should never be called on client topology";
+        lock.writeLock().lock();
 
-        return false;
+
+        boolean changed = false;
+
+        try {
+            for (int part = 0; part < parts; part++) {
+                boolean lost = F.contains(lostParts, part);
+
+                if (!lost) {
+                    boolean hasOwner = false;
+
+                    for (GridDhtPartitionMap partMap : node2part.values()) {
+                        if (partMap.get(part) == OWNING) {
+                            hasOwner = true;
+                            break;
+                        }
+                    }
+
+                    if (!hasOwner) {
+                        if (lostParts == null)
+                            lostParts = new TreeSet<>();
+
+                        changed = true;
+
+                        lostParts.add(part);
+                    }
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+
+        return changed;
     }
 
     /** {@inheritDoc} */
     @Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
-        assert false : "resetLostPartitions should never be called on client topology";
+        lock.writeLock().lock();
+
+        try {
+            lostParts = null;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */
@@ -1208,6 +1255,17 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 }
             }
 
+            if (lostParts != null) {
+                for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+                    for (Integer part : lostParts) {
+                        GridDhtPartitionState state = e.getValue().get(part);
+
+                        if (state != null && state.active())
+                            e.getValue().put(part, LOST);
+                    }
+                }
+            }
+
             for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet())
                 part2node.put(entry.getKey(), entry.getValue());