You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/25 14:45:52 UTC

ignite git commit: IGNITE-8324 Update disco cache for affinity topology together with topology version - Fixes #3880.

Repository: ignite
Updated Branches:
  refs/heads/master 95fc14903 -> 36e8cf54a


IGNITE-8324 Update disco cache for affinity topology together with topology version - Fixes #3880.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36e8cf54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36e8cf54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36e8cf54

Branch: refs/heads/master
Commit: 36e8cf54a4acb2e43731d17f22fd6f50dc5d0e5f
Parents: 95fc149
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 25 17:45:28 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 25 17:45:28 2018 +0300

----------------------------------------------------------------------
 .../dht/ClientCacheDhtTopologyFuture.java       |  3 +-
 .../dht/GridClientPartitionTopology.java        |  6 +---
 .../dht/GridDhtPartitionTopologyImpl.java       | 15 ++++----
 .../GridDhtPartitionsExchangeFuture.java        | 21 -----------
 .../distributed/CacheExchangeMergeTest.java     | 38 ++++++++++++++++++++
 ...gnitePdsCacheAssignmentNodeRestartsTest.java | 24 +++++++------
 6 files changed, 63 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index 317037b..4b48f5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -27,8 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  * Topology future created for client cache start.
  */
-public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
-    implements GridDhtTopologyFuture {
+public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter {
     /** */
     final AffinityTopologyVersion topVer;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index dcb8b96..cf8fc34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -207,11 +207,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            // Update is correct if topology version is newer or in case of newer discovery caches.
-            boolean isCorrectUpdate = exchTopVer.compareTo(topVer) > 0
-                    || (exchTopVer.compareTo(topVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0);
-
-            assert isCorrectUpdate : "Invalid topology version [grp=" + grpId +
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId +
                 ", topVer=" + topVer +
                 ", exchVer=" + exchTopVer +
                 ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +

http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 3d664f3..2c47315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -241,11 +241,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            // Update is correct if topology version is newer or in case of newer discovery caches.
-            boolean isCorrectUpdate = exchTopVer.compareTo(readyTopVer) > 0
-                    || (exchTopVer.compareTo(readyTopVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0);
-
-            assert isCorrectUpdate : "Invalid topology version [grp=" + grp.cacheOrGroupName() +
+            assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() +
                 ", topVer=" + readyTopVer +
                 ", exchTopVer=" + exchTopVer +
                 ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +
@@ -489,6 +485,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                             ", evtsVer=" + evts.topologyVersion() + ']';
 
                         lastTopChangeVer = readyTopVer = evts.topologyVersion();
+
+                        discoCache = evts.discoveryCache();
                     }
 
                     if (log.isDebugEnabled()) {
@@ -1821,7 +1819,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer) {
+    @Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut,
+                                         AffinityAssignment assignment,
+                                         boolean updateRebalanceVer) {
         lock.writeLock().lock();
 
         try {
@@ -1830,6 +1830,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             readyTopVer = lastTopChangeVer = assignment.topologyVersion();
 
+            if (fut != null)
+                discoCache = fut.events().discoveryCache();
+
             if (!grp.isReplicated()) {
                 boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
                     fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();

http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 56907b2..33bd989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1919,18 +1919,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * Checks that some futures were merged to the current.
-     * Future without merges has only one DiscoveryEvent.
-     * If we merge futures to the current (see {@link GridCachePartitionExchangeManager#mergeExchanges(GridDhtPartitionsExchangeFuture, GridDhtPartitionsFullMessage)})
-     * we add new discovery event from merged future.
-     *
-     * @return {@code True} If some futures were merged to current, false in other case.
-     */
-    private boolean hasMergedExchanges() {
-        return context().events().events().size() > 1;
-    }
-
-    /**
      * @param fut Current future.
      * @return Pending join request if any.
      */
@@ -2475,12 +2463,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
 
-                // Synchronize in case of changed coordinator (thread switched to sys-*)
-                synchronized (mux) {
-                    if (hasMergedExchanges())
-                        updateTopologies(true);
-                }
-
                 if (!finish)
                     return;
             }
@@ -3116,9 +3098,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         return; // Node is stopping, no need to further process exchange.
                     }
 
-                    if (hasMergedExchanges())
-                        updateTopologies(false);
-
                     assert resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected result version [" +
                         "msgVer=" + resTopVer +
                         ", locVer=" + exchCtx.events().topologyVersion() + ']';

http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 9660a76..6c714b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -1190,6 +1190,8 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange();
 
+        checkTopologiesConsistency();
+
         checkCaches0();
     }
 
@@ -1206,6 +1208,42 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Checks that after exchange all nodes have consistent state about partition owners.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkTopologiesConsistency() throws Exception {
+        List<Ignite> nodes = G.allGrids();
+
+        IgniteEx crdNode = null;
+
+        for (Ignite node : nodes) {
+            ClusterNode locNode = node.cluster().localNode();
+
+            if (crdNode == null || locNode.order() < crdNode.localNode().order())
+                crdNode = (IgniteEx) node;
+        }
+
+        for (Ignite node : nodes) {
+            IgniteEx node0 = (IgniteEx) node;
+
+            if (node0.localNode().id().equals(crdNode.localNode().id()))
+                continue;
+
+            for (IgniteInternalCache cache : node0.context().cache().caches()) {
+                int partitions = cache.context().affinity().partitions();
+                for (int p = 0; p < partitions; p++) {
+                    List<ClusterNode> crdOwners = crdNode.cachex(cache.name()).cache().context().topology().owners(p);
+
+                    List<ClusterNode> owners = cache.context().topology().owners(p);
+
+                    assertEquals(crdOwners, owners);
+                }
+            }
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void checkAffinity() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
index 0b8f15a..c57165c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
@@ -29,10 +29,9 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
-import org.apache.ignite.configuration.MemoryPolicyConfiguration;
-import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -41,7 +40,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -65,11 +63,17 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setMemoryConfiguration(new MemoryConfiguration().setDefaultMemoryPolicyName("d").
-            setPageSize(1024).setMemoryPolicies(new MemoryPolicyConfiguration().setName("d").
-            setInitialSize(50 * 1024 * 1024L).setMaxSize(50 * 1024 * 1024)));
+        cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setPageSize(1024)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)
+                .setInitialSize(50 * 1024 * 1024L)
+                .setMaxSize(50 * 1024 * 1024L)
+            )
+            .setWalMode(WALMode.LOG_ONLY)
+        );
 
         ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
@@ -80,14 +84,14 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+        cleanPersistenceDir();
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
-        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+        cleanPersistenceDir();
 
         super.afterTest();
     }