You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/03/15 11:55:07 UTC
[ignite] branch master updated: IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c3e5fc2 IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880
c3e5fc2 is described below
commit c3e5fc2511a7398df6affd3da4c1d7574d00ce97
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue Mar 15 14:54:30 2022 +0300
IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880
---
.../cache/CacheAffinitySharedManager.java | 2 +-
.../processors/cache/ClusterCachesInfo.java | 75 ++++++++---------
.../processors/cache/GridCacheProcessor.java | 17 ++--
.../IgnitePdsNodeJoinWithCachesStopping.java | 94 ++++++++++++++++++++++
4 files changed, 138 insertions(+), 50 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index bd2c4f6..68c7d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -854,7 +854,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
processCacheStopRequests(fut, crd, exchActions, true);
- cctx.cache().forceCloseCaches(exchActions);
+ cctx.cache().forceCloseCaches(fut.initialVersion(), exchActions);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 0b8a022..910a8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -33,6 +33,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
@@ -117,16 +119,12 @@ public class ClusterCachesInfo {
private final GridKernalContext ctx;
/**
- * Map contains cache descriptors that were removed from {@link #registeredCaches} due to cache stop request.
- * Such descriptors will be removed from the map only after whole cache stop process is finished.
- */
- private final ConcurrentMap<String, DynamicCacheDescriptor> markedForDeletionCaches = new ConcurrentHashMap<>();
-
- /**
* Map contains cache group descriptors that were removed from {@link #registeredCacheGrps} due to cache stop request.
* Such descriptors will be removed from the map only after whole cache stop process is finished.
+ * Affinity topology version equals the version which will be applied after a cache group is completely removed.
*/
- private final ConcurrentMap<Integer, CacheGroupDescriptor> markedForDeletionCacheGrps = new ConcurrentHashMap<>();
+ private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<Integer, CacheGroupDescriptor>>
+ markedForDeletionCacheGrps = new ConcurrentSkipListMap<>();
/** Dynamic caches. */
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
@@ -558,7 +556,7 @@ public class ClusterCachesInfo {
* due to dynamic cache start failure.
*
* @param failMsg Dynamic change request fail message.
- * @param topVer Topology version.
+ * @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
ExchangeActions exchangeActions = new ExchangeActions();
@@ -580,7 +578,7 @@ public class ClusterCachesInfo {
/**
* @param batch Cache change request.
- * @param topVer Topology version.
+ * @param topVer Current topology version.
* @return {@code True} if minor topology version should be increased.
*/
public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
@@ -647,7 +645,7 @@ public class ClusterCachesInfo {
/**
* @param exchangeActions Exchange actions to update.
* @param reqs Requests.
- * @param topVer Topology version.
+ * @param topVer Current topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @return Process result.
*/
@@ -706,7 +704,7 @@ public class ClusterCachesInfo {
/**
* @param req Cache change request.
* @param exchangeActions Exchange actions to update.
- * @param topVer Topology version.
+ * @param topVer Current topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param reqsToComplete Accumulator for cache change requests which should be completed after
@@ -799,7 +797,7 @@ public class ClusterCachesInfo {
return;
}
- if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc))
+ if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc, topVer.nextMinorVersion()))
return;
needExchange = true;
@@ -821,6 +819,7 @@ public class ClusterCachesInfo {
* @param exchangeActions Exchange actions to update.
* @param cacheName Cache name.
* @param desc Dynamic cache descriptor.
+ * @param topVer Topology version that will be applied after the corresponding partition map exchange.
* @return {@code true} if stop request can be proceed.
*/
private boolean processStopCacheRequest(
@@ -828,7 +827,8 @@ public class ClusterCachesInfo {
DynamicCacheChangeRequest req,
CacheChangeProcessResult res,
String cacheName,
- DynamicCacheDescriptor desc
+ DynamicCacheDescriptor desc,
+ AffinityTopologyVersion topVer
) {
if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
IgniteCheckedException err = new IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
@@ -842,14 +842,11 @@ public class ClusterCachesInfo {
return false;
}
- DynamicCacheDescriptor old = registeredCaches.get(cacheName);
+ DynamicCacheDescriptor old = registeredCaches.remove(cacheName);
assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
- markedForDeletionCaches.put(cacheName, old);
-
- DynamicCacheDescriptor removedCacheDescriptor = registeredCaches.remove(cacheName);
- registeredCachesById.remove(removedCacheDescriptor.cacheId());
+ registeredCachesById.remove(old.cacheId());
if (req.restart()) {
IgniteUuid restartId = req.restartId();
@@ -868,7 +865,9 @@ public class ClusterCachesInfo {
grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
if (!grpDesc.hasCaches()) {
- markedForDeletionCacheGrps.put(grpDesc.groupId(), grpDesc);
+ markedForDeletionCacheGrps
+ .computeIfAbsent(topVer, (map) -> new ConcurrentHashMap<>())
+ .put(grpDesc.groupId(), grpDesc);
registeredCacheGrps.remove(grpDesc.groupId());
@@ -1558,14 +1557,12 @@ public class ClusterCachesInfo {
saveCacheConfiguration(entry.getKey());
}
- for (DynamicCacheDescriptor descriptor : cachesToSave) {
+ for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
- }
}
else if (patchesToApply.isEmpty()) {
- for (DynamicCacheDescriptor descriptor : cachesToSave) {
+ for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
- }
}
}
}
@@ -1645,31 +1642,27 @@ public class ClusterCachesInfo {
}
/**
- * @param cacheName Cache name.
- */
- public void cleanupRemovedCache(String cacheName) {
- markedForDeletionCaches.remove(cacheName);
- }
-
- /**
- * @param grpId Group ID.
- */
- public void cleanupRemovedGroup(int grpId) {
- markedForDeletionCacheGrps.remove(grpId);
- }
-
- /**
- * @param cacheName Cache name.
+ * Cleanups cache descriptors that belong to the {@code topVer} and earlier.
+ *
+ * @param topVer Topology version.
*/
- public @Nullable DynamicCacheDescriptor markedForDeletionCacheDesc(String cacheName) {
- return markedForDeletionCaches.get(cacheName);
+ public void cleanupRemovedCaches(AffinityTopologyVersion topVer) {
+ markedForDeletionCacheGrps.headMap(topVer, true).clear();
}
/**
* @param grpId Group id.
*/
public @Nullable CacheGroupDescriptor markedForDeletionCacheGroupDesc(int grpId) {
- return markedForDeletionCacheGrps.get(grpId);
+ // Find the "earliest" available descriptor.
+ for (Map<Integer, CacheGroupDescriptor> descriptors : markedForDeletionCacheGrps.values()) {
+ CacheGroupDescriptor desc = descriptors.get(grpId);
+
+ if (desc != null)
+ return desc;
+ }
+
+ return null;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1d6fc6b..68131c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -554,8 +554,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
cctx.cleanup();
-
- cachesInfo.cleanupRemovedCache(cctx.name());
}
/**
@@ -583,8 +581,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
grp.removeIOStatistic(destroy);
sharedCtx.evict().cleanupRemovedGroup(grp.groupId());
-
- cachesInfo.cleanupRemovedGroup(grp.groupId());
}
/**
@@ -2789,18 +2785,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Called during the rollback of the exchange partitions procedure in order to stop the given cache even if it's not
* fully initialized (e.g. failed on cache init stage).
*
+ * @param topVer Topology version related to the given {@code exchActions}.
* @param exchActions Stop requests.
*/
- void forceCloseCaches(ExchangeActions exchActions) {
+ void forceCloseCaches(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
assert exchActions != null && !exchActions.cacheStopRequests().isEmpty();
- processCacheStopRequestOnExchangeDone(exchActions);
+ processCacheStopRequestOnExchangeDone(topVer, exchActions);
}
/**
+ * @param topVer Topology version related to the given {@code exchActions}.
* @param exchActions Change requests.
*/
- private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) {
+ private void processCacheStopRequestOnExchangeDone(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
@@ -2886,6 +2884,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IgniteException(msg, e);
}
+ finally {
+ cachesInfo.cleanupRemovedCaches(topVer);
+ }
for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
stopCacheGroup(grp.get1().groupId(), grp.get2());
@@ -2981,7 +2982,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.dataStructures().restoreStructuresState(ctx);
if (err == null)
- processCacheStopRequestOnExchangeDone(exchActions);
+ processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
index a2f557a..911c3e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
@@ -17,16 +17,22 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
/**
* Test checks correctness of simultaneous node join and massive caches stopping.
*/
@@ -35,6 +41,8 @@ public class IgnitePdsNodeJoinWithCachesStopping extends GridCommonAbstractTest
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
return cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(200 * 1024 * 1024)
@@ -90,4 +98,90 @@ public class IgnitePdsNodeJoinWithCachesStopping extends GridCommonAbstractTest
assertTrue(gridStartFut.get());
}
+
+ /**
+ * The test checks the correctness of handling of stop and start caches with same name during the long PME.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartStopCacheWithLongPME() throws Exception {
+ IgniteEx crd = (IgniteEx)startGridsMultiThreaded(2);
+
+ IgniteEx client = startClientGrid(2);
+
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(grid(1));
+
+ spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+ // Start a new cache and block PME in order to start/stop this cache during the blocked PME.
+ IgniteInternalFuture<?> startFut1 = GridTestUtils.runAsync(() -> {
+ try {
+ client.getOrCreateCache(DEFAULT_CACHE_NAME);
+ }
+ catch (CacheException e) {
+ throw new RuntimeException("Failed to create a new cache (step 1)", e);
+ }
+ });
+
+ // Wait for initialization phase of PME.
+ spi1.waitForBlocked();
+
+ // Let's destroy the cache that is beign created at this time.
+ // This request should lead to removing the corresponding cache desriptor.
+ // See ClusterCachesInfo.onCacheChangeRequested(DynamicCacheChangeBatch, AffinityTopologyVersion)
+ IgniteInternalFuture<?> stopFut1 = GridTestUtils.runAsync(() -> {
+ try {
+ client.destroyCache(DEFAULT_CACHE_NAME);
+ }
+ catch (CacheException e) {
+ throw new RuntimeException("Failed to destroy new cache (step 1)", e);
+ }
+ });
+
+ assertTrue(
+ "Failed to wait for DynamicCacheChangeBatch message (destroy, step 1)",
+ waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 2, getTestTimeout()));
+
+ // Let's start and stop the cache once again to clean up ClusterCachesInfo, i.e.
+ // registeredCaches and markedForDeletionCaches will be cleaned,
+ // and therefore, the corresponding cache descriptor will be lost.
+ IgniteInternalFuture<?> startFut2 = GridTestUtils.runAsync(() -> {
+ try {
+ client.getOrCreateCache(DEFAULT_CACHE_NAME);
+ }
+ catch (CacheException e) {
+ throw new RuntimeException("Failed to create a new cache (step 2)", e);
+ }
+ });
+
+ assertTrue(
+ "Failed to wait for DynamicCacheChangeBatch message (create, step 2)",
+ waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 3, getTestTimeout()));
+
+ IgniteInternalFuture<?> stopFut2 = GridTestUtils.runAsync(() -> {
+ try {
+ client.destroyCache(DEFAULT_CACHE_NAME);
+ }
+ catch (CacheException e) {
+ throw new RuntimeException("Failed to destroy new cache (step 1)", e);
+ }
+ });
+
+ assertTrue(
+ "Failed to wait for DynamicCacheChangeBatch message (create, step 2)",
+ waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 4, getTestTimeout()));
+
+ // Unblock the initial PME.
+ spi1.stopBlock();
+
+ startFut1.get();
+ stopFut1.get();
+ startFut2.get();
+ stopFut2.get();
+
+ assertNull("The '" + DEFAULT_CACHE_NAME + "' cache is not destroyed.", crd.cache(DEFAULT_CACHE_NAME));
+ }
}