You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/25 14:45:52 UTC
ignite git commit: IGNITE-8324 Update disco cache for affinity
topology together with topology version - Fixes #3880.
Repository: ignite
Updated Branches:
refs/heads/master 95fc14903 -> 36e8cf54a
IGNITE-8324 Update disco cache for affinity topology together with topology version - Fixes #3880.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36e8cf54
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36e8cf54
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36e8cf54
Branch: refs/heads/master
Commit: 36e8cf54a4acb2e43731d17f22fd6f50dc5d0e5f
Parents: 95fc149
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Wed Apr 25 17:45:28 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Apr 25 17:45:28 2018 +0300
----------------------------------------------------------------------
.../dht/ClientCacheDhtTopologyFuture.java | 3 +-
.../dht/GridClientPartitionTopology.java | 6 +---
.../dht/GridDhtPartitionTopologyImpl.java | 15 ++++----
.../GridDhtPartitionsExchangeFuture.java | 21 -----------
.../distributed/CacheExchangeMergeTest.java | 38 ++++++++++++++++++++
...gnitePdsCacheAssignmentNodeRestartsTest.java | 24 +++++++------
6 files changed, 63 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
index 317037b..4b48f5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -27,8 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Topology future created for client cache start.
*/
-public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
- implements GridDhtTopologyFuture {
+public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter {
/** */
final AffinityTopologyVersion topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index dcb8b96..cf8fc34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -207,11 +207,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
try {
AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
- // Update is correct if topology version is newer or in case of newer discovery caches.
- boolean isCorrectUpdate = exchTopVer.compareTo(topVer) > 0
- || (exchTopVer.compareTo(topVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0);
-
- assert isCorrectUpdate : "Invalid topology version [grp=" + grpId +
+ assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId +
", topVer=" + topVer +
", exchVer=" + exchTopVer +
", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 3d664f3..2c47315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -241,11 +241,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
- // Update is correct if topology version is newer or in case of newer discovery caches.
- boolean isCorrectUpdate = exchTopVer.compareTo(readyTopVer) > 0
- || (exchTopVer.compareTo(readyTopVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0);
-
- assert isCorrectUpdate : "Invalid topology version [grp=" + grp.cacheOrGroupName() +
+ assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() +
", topVer=" + readyTopVer +
", exchTopVer=" + exchTopVer +
", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +
@@ -489,6 +485,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
", evtsVer=" + evts.topologyVersion() + ']';
lastTopChangeVer = readyTopVer = evts.topologyVersion();
+
+ discoCache = evts.discoveryCache();
}
if (log.isDebugEnabled()) {
@@ -1821,7 +1819,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer) {
+ @Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut,
+ AffinityAssignment assignment,
+ boolean updateRebalanceVer) {
lock.writeLock().lock();
try {
@@ -1830,6 +1830,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
readyTopVer = lastTopChangeVer = assignment.topologyVersion();
+ if (fut != null)
+ discoCache = fut.events().discoveryCache();
+
if (!grp.isReplicated()) {
boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 56907b2..33bd989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1919,18 +1919,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- * Checks that some futures were merged to the current.
- * Future without merges has only one DiscoveryEvent.
- * If we merge futures to the current (see {@link GridCachePartitionExchangeManager#mergeExchanges(GridDhtPartitionsExchangeFuture, GridDhtPartitionsFullMessage)})
- * we add new discovery event from merged future.
- *
- * @return {@code True} If some futures were merged to current, false in other case.
- */
- private boolean hasMergedExchanges() {
- return context().events().events().size() > 1;
- }
-
- /**
* @param fut Current future.
* @return Pending join request if any.
*/
@@ -2475,12 +2463,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
- // Synchronize in case of changed coordinator (thread switched to sys-*)
- synchronized (mux) {
- if (hasMergedExchanges())
- updateTopologies(true);
- }
-
if (!finish)
return;
}
@@ -3116,9 +3098,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
return; // Node is stopping, no need to further process exchange.
}
- if (hasMergedExchanges())
- updateTopologies(false);
-
assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" +
"msgVer=" + resTopVer +
", locVer=" + exchCtx.events().topologyVersion() + ']';
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 9660a76..6c714b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -1190,6 +1190,8 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
awaitPartitionMapExchange();
+ checkTopologiesConsistency();
+
checkCaches0();
}
@@ -1206,6 +1208,42 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
}
/**
+ * Checks that after exchange all nodes have consistent state about partition owners.
+ *
+ * @throws Exception If failed.
+ */
+ private void checkTopologiesConsistency() throws Exception {
+ List<Ignite> nodes = G.allGrids();
+
+ IgniteEx crdNode = null;
+
+ for (Ignite node : nodes) {
+ ClusterNode locNode = node.cluster().localNode();
+
+ if (crdNode == null || locNode.order() < crdNode.localNode().order())
+ crdNode = (IgniteEx) node;
+ }
+
+ for (Ignite node : nodes) {
+ IgniteEx node0 = (IgniteEx) node;
+
+ if (node0.localNode().id().equals(crdNode.localNode().id()))
+ continue;
+
+ for (IgniteInternalCache cache : node0.context().cache().caches()) {
+ int partitions = cache.context().affinity().partitions();
+ for (int p = 0; p < partitions; p++) {
+ List<ClusterNode> crdOwners = crdNode.cachex(cache.name()).cache().context().topology().owners(p);
+
+ List<ClusterNode> owners = cache.context().topology().owners(p);
+
+ assertEquals(crdOwners, owners);
+ }
+ }
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
private void checkAffinity() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
index 0b8f15a..c57165c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java
@@ -29,10 +29,9 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
-import org.apache.ignite.configuration.MemoryPolicyConfiguration;
-import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -41,7 +40,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -65,11 +63,17 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setMemoryConfiguration(new MemoryConfiguration().setDefaultMemoryPolicyName("d").
- setPageSize(1024).setMemoryPolicies(new MemoryPolicyConfiguration().setName("d").
- setInitialSize(50 * 1024 * 1024L).setMaxSize(50 * 1024 * 1024)));
+ cfg.setConsistentId(igniteInstanceName);
- cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setPageSize(1024)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setInitialSize(50 * 1024 * 1024L)
+ .setMaxSize(50 * 1024 * 1024L)
+ )
+ .setWalMode(WALMode.LOG_ONLY)
+ );
((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
@@ -80,14 +84,14 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
- U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ cleanPersistenceDir();
super.afterTest();
}