You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/29 14:38:29 UTC

[15/16] ignite git commit: gg-12688 : Fixed updateSequence in ClientTopology.

gg-12688 : Fixed updateSequence in ClientTopology.

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f589cc84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f589cc84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f589cc84

Branch: refs/heads/master
Commit: f589cc847e3f0af8c90b519b26c0818f9cee2822
Parents: c6b7011
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Mon Aug 28 20:31:18 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Aug 29 17:32:04 2017 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        | 18 +++++---
 .../GridDhtPartitionsExchangeFuture.java        | 15 +++++--
 .../IgnitePdsCacheRebalancingAbstractTest.java  | 44 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f589cc84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c8856fd..299394f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1092,20 +1092,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
         try {
             for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
-                if (!e.getValue().containsKey(p))
+                GridDhtPartitionMap partMap = e.getValue();
+
+                if (!partMap.containsKey(p))
                     continue;
 
-                if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) {
+                if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) {
                     if (haveHistory)
-                        e.getValue().put(p, MOVING);
+                        partMap.put(p, MOVING);
                     else {
-                        e.getValue().put(p, RENTING);
+                        partMap.put(p, RENTING);
 
                         result.add(e.getKey());
                     }
+
+                    partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
+
+                    U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " +
+                        "[nodeId=" + e.getKey() + ", groupId=" + grpId +
+                        ", partId=" + p + ", haveHistory=" + haveHistory + "]");
                 }
-                else if (owners.contains(e.getKey()))
-                    e.getValue().put(p, OWNING);
             }
 
             part2node.put(p, owners);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f589cc84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 299284d..240b5f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2437,11 +2437,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void assignPartitionsStates() {
         if (cctx.database().persistenceEnabled()) {
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
+            for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) {
+                if (e.getValue().config().getCacheMode() == CacheMode.LOCAL)
                     continue;
 
-                assignPartitionStates(grp.topology());
+                GridDhtPartitionTopology top;
+
+                CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey());
+
+                if (grpCtx != null)
+                    top = grpCtx.topology();
+                else
+                    top = cctx.exchange().clientTopology(e.getKey());
+
+                assignPartitionStates(top);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f589cc84/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 7b047f8..2d237cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -96,7 +98,19 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         ccfg2.setQueryEntities(Collections.singleton(qryEntity));
 
-        cfg.setCacheConfiguration(ccfg1, ccfg2);
+        // Do not start filtered cache on coordinator.
+        if (gridName.endsWith("0")) {
+            cfg.setCacheConfiguration(ccfg1, ccfg2);
+        }
+        else {
+            CacheConfiguration ccfg3 = cacheConfiguration("filtered");
+            ccfg3.setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE);
+            ccfg3.setBackups(1);
+            ccfg3.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            ccfg3.setNodeFilter(new CoordinatorNodeFilter());
+
+            cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3);
+        }
 
         MemoryConfiguration memCfg = new MemoryConfiguration();
 
@@ -501,7 +515,23 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      * @throws Exception If failed.
      */
     public void testForceRebalance() throws Exception {
-        final Ignite ig = startGrids(4);
+        testForceRebalance(cacheName);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForceRebalanceClientTopology() throws Exception {
+        testForceRebalance("filtered");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testForceRebalance(String cacheName) throws Exception {
+        startGrids(4);
+
+        final Ignite ig = grid(1);
 
         ig.active(true);
 
@@ -653,4 +683,14 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                 '}';
         }
     }
+
+    /**
+     *
+     */
+    private static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.order() > 1;
+        }
+    }
 }