You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/03/15 11:55:07 UTC

[ignite] branch master updated: IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e5fc2  IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880
c3e5fc2 is described below

commit c3e5fc2511a7398df6affd3da4c1d7574d00ce97
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue Mar 15 14:54:30 2022 +0300

    IGNITE-16672 Fixed an issue that led to "Failed to get page store for the given cache ID" error on cache start. Fixes #9880
---
 .../cache/CacheAffinitySharedManager.java          |  2 +-
 .../processors/cache/ClusterCachesInfo.java        | 75 ++++++++---------
 .../processors/cache/GridCacheProcessor.java       | 17 ++--
 .../IgnitePdsNodeJoinWithCachesStopping.java       | 94 ++++++++++++++++++++++
 4 files changed, 138 insertions(+), 50 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index bd2c4f6..68c7d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -854,7 +854,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         processCacheStopRequests(fut, crd, exchActions, true);
 
-        cctx.cache().forceCloseCaches(exchActions);
+        cctx.cache().forceCloseCaches(fut.initialVersion(), exchActions);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 0b8a022..910a8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -33,6 +33,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -117,16 +119,12 @@ public class ClusterCachesInfo {
     private final GridKernalContext ctx;
 
     /**
-     * Map contains cache descriptors that were removed from {@link #registeredCaches} due to cache stop request.
-     * Such descriptors will be removed from the map only after whole cache stop process is finished.
-     */
-    private final ConcurrentMap<String, DynamicCacheDescriptor> markedForDeletionCaches = new ConcurrentHashMap<>();
-
-    /**
      * Map contains cache group descriptors that were removed from {@link #registeredCacheGrps} due to cache stop request.
      * Such descriptors will be removed from the map only after whole cache stop process is finished.
+     * Affinity topology version equals the version which will be applied after a cache group is completely removed.
      */
-    private final ConcurrentMap<Integer, CacheGroupDescriptor> markedForDeletionCacheGrps = new ConcurrentHashMap<>();
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<Integer, CacheGroupDescriptor>>
+        markedForDeletionCacheGrps = new ConcurrentSkipListMap<>();
 
     /** Dynamic caches. */
     private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
@@ -558,7 +556,7 @@ public class ClusterCachesInfo {
      * due to dynamic cache start failure.
      *
      * @param failMsg Dynamic change request fail message.
-     * @param topVer Topology version.
+     * @param topVer Current topology version.
      */
     public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
         ExchangeActions exchangeActions = new ExchangeActions();
@@ -580,7 +578,7 @@ public class ClusterCachesInfo {
 
     /**
      * @param batch Cache change request.
-     * @param topVer Topology version.
+     * @param topVer Current topology version.
      * @return {@code True} if minor topology version should be increased.
      */
     public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
@@ -647,7 +645,7 @@ public class ClusterCachesInfo {
     /**
      * @param exchangeActions Exchange actions to update.
      * @param reqs Requests.
-     * @param topVer Topology version.
+     * @param topVer Current topology version.
      * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
      * @return Process result.
      */
@@ -706,7 +704,7 @@ public class ClusterCachesInfo {
     /**
      * @param req Cache change request.
      * @param exchangeActions Exchange actions to update.
-     * @param topVer Topology version.
+     * @param topVer Current topology version.
      * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
      * @param res Accumulator for cache change process results.
      * @param reqsToComplete Accumulator for cache change requests which should be completed after
@@ -799,7 +797,7 @@ public class ClusterCachesInfo {
                     return;
                 }
 
-                if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc))
+                if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc, topVer.nextMinorVersion()))
                     return;
 
                 needExchange = true;
@@ -821,6 +819,7 @@ public class ClusterCachesInfo {
      * @param exchangeActions Exchange actions to update.
      * @param cacheName Cache name.
      * @param desc Dynamic cache descriptor.
+     * @param topVer Topology version that will be applied after the corresponding partition map exchange.
      * @return {@code true} if stop request can be proceed.
      */
     private boolean processStopCacheRequest(
@@ -828,7 +827,8 @@ public class ClusterCachesInfo {
         DynamicCacheChangeRequest req,
         CacheChangeProcessResult res,
         String cacheName,
-        DynamicCacheDescriptor desc
+        DynamicCacheDescriptor desc,
+        AffinityTopologyVersion topVer
     ) {
         if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
             IgniteCheckedException err = new IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
@@ -842,14 +842,11 @@ public class ClusterCachesInfo {
             return false;
         }
 
-        DynamicCacheDescriptor old = registeredCaches.get(cacheName);
+        DynamicCacheDescriptor old = registeredCaches.remove(cacheName);
 
         assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
-        markedForDeletionCaches.put(cacheName, old);
-
-        DynamicCacheDescriptor removedCacheDescriptor = registeredCaches.remove(cacheName);
-        registeredCachesById.remove(removedCacheDescriptor.cacheId());
+        registeredCachesById.remove(old.cacheId());
 
         if (req.restart()) {
             IgniteUuid restartId = req.restartId();
@@ -868,7 +865,9 @@ public class ClusterCachesInfo {
         grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
 
         if (!grpDesc.hasCaches()) {
-            markedForDeletionCacheGrps.put(grpDesc.groupId(), grpDesc);
+            markedForDeletionCacheGrps
+                .computeIfAbsent(topVer, (map) -> new ConcurrentHashMap<>())
+                .put(grpDesc.groupId(), grpDesc);
 
             registeredCacheGrps.remove(grpDesc.groupId());
 
@@ -1558,14 +1557,12 @@ public class ClusterCachesInfo {
                         saveCacheConfiguration(entry.getKey());
                 }
 
-                for (DynamicCacheDescriptor descriptor : cachesToSave) {
+                for (DynamicCacheDescriptor descriptor : cachesToSave)
                     saveCacheConfiguration(descriptor);
-                }
             }
             else if (patchesToApply.isEmpty()) {
-                for (DynamicCacheDescriptor descriptor : cachesToSave) {
+                for (DynamicCacheDescriptor descriptor : cachesToSave)
                     saveCacheConfiguration(descriptor);
-                }
             }
         }
     }
@@ -1645,31 +1642,27 @@ public class ClusterCachesInfo {
     }
 
     /**
-     * @param cacheName Cache name.
-     */
-    public void cleanupRemovedCache(String cacheName) {
-        markedForDeletionCaches.remove(cacheName);
-    }
-
-    /**
-     * @param grpId Group ID.
-     */
-    public void cleanupRemovedGroup(int grpId) {
-        markedForDeletionCacheGrps.remove(grpId);
-    }
-
-    /**
-     * @param cacheName Cache name.
+     * Cleanups cache descriptors that belong to the {@code topVer} and earlier.
+     *
+     * @param topVer Topology version.
      */
-    public @Nullable DynamicCacheDescriptor markedForDeletionCacheDesc(String cacheName) {
-        return markedForDeletionCaches.get(cacheName);
+    public void cleanupRemovedCaches(AffinityTopologyVersion topVer) {
+        markedForDeletionCacheGrps.headMap(topVer, true).clear();
     }
 
     /**
      * @param grpId Group id.
      */
     public @Nullable CacheGroupDescriptor markedForDeletionCacheGroupDesc(int grpId) {
-        return markedForDeletionCacheGrps.get(grpId);
+        // Find the "earliest" available descriptor.
+        for (Map<Integer, CacheGroupDescriptor> descriptors : markedForDeletionCacheGrps.values()) {
+            CacheGroupDescriptor desc = descriptors.get(grpId);
+
+            if (desc != null)
+                return desc;
+        }
+
+        return null;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1d6fc6b..68131c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -554,8 +554,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         cctx.cleanup();
-
-        cachesInfo.cleanupRemovedCache(cctx.name());
     }
 
     /**
@@ -583,8 +581,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         grp.removeIOStatistic(destroy);
 
         sharedCtx.evict().cleanupRemovedGroup(grp.groupId());
-
-        cachesInfo.cleanupRemovedGroup(grp.groupId());
     }
 
     /**
@@ -2789,18 +2785,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Called during the rollback of the exchange partitions procedure in order to stop the given cache even if it's not
      * fully initialized (e.g. failed on cache init stage).
      *
+     * @param topVer Topology version related to the given {@code exchActions}.
      * @param exchActions Stop requests.
      */
-    void forceCloseCaches(ExchangeActions exchActions) {
+    void forceCloseCaches(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
         assert exchActions != null && !exchActions.cacheStopRequests().isEmpty();
 
-        processCacheStopRequestOnExchangeDone(exchActions);
+        processCacheStopRequestOnExchangeDone(topVer, exchActions);
     }
 
     /**
+     * @param topVer Topology version related to the given {@code exchActions}.
      * @param exchActions Change requests.
      */
-    private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) {
+    private void processCacheStopRequestOnExchangeDone(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
         // Reserve at least 2 threads for system operations.
         int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
 
@@ -2886,6 +2884,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             throw new IgniteException(msg, e);
         }
+        finally {
+            cachesInfo.cleanupRemovedCaches(topVer);
+        }
 
         for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
             stopCacheGroup(grp.get1().groupId(), grp.get2());
@@ -2981,7 +2982,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ctx.dataStructures().restoreStructuresState(ctx);
 
         if (err == null)
-            processCacheStopRequestOnExchangeDone(exchActions);
+            processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
index a2f557a..911c3e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNodeJoinWithCachesStopping.java
@@ -17,16 +17,22 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
 /**
  * Test checks correctness of simultaneous node join and massive caches stopping.
  */
@@ -35,6 +41,8 @@ public class IgnitePdsNodeJoinWithCachesStopping extends GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
         return cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setMaxSize(200 * 1024 * 1024)
@@ -90,4 +98,90 @@ public class IgnitePdsNodeJoinWithCachesStopping extends GridCommonAbstractTest
 
         assertTrue(gridStartFut.get());
     }
+
+    /**
+     * The test checks the correctness of handling of stop and start caches with same name during the long PME.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartStopCacheWithLongPME() throws Exception {
+        IgniteEx crd = (IgniteEx)startGridsMultiThreaded(2);
+
+        IgniteEx client = startClientGrid(2);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(grid(1));
+
+        spi1.blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+
+        // Start a new cache and block PME in order to start/stop this cache during the blocked PME.
+        IgniteInternalFuture<?> startFut1 = GridTestUtils.runAsync(() -> {
+            try {
+                client.getOrCreateCache(DEFAULT_CACHE_NAME);
+            }
+            catch (CacheException e) {
+                throw new RuntimeException("Failed to create a new cache (step 1)", e);
+            }
+        });
+
+        // Wait for initialization phase of PME.
+        spi1.waitForBlocked();
+
+        // Let's destroy the cache that is beign created at this time.
+        // This request should lead to removing the corresponding cache desriptor.
+        // See ClusterCachesInfo.onCacheChangeRequested(DynamicCacheChangeBatch, AffinityTopologyVersion)
+        IgniteInternalFuture<?> stopFut1 = GridTestUtils.runAsync(() -> {
+            try {
+                client.destroyCache(DEFAULT_CACHE_NAME);
+            }
+            catch (CacheException e) {
+                throw new RuntimeException("Failed to destroy new cache (step 1)", e);
+            }
+        });
+
+        assertTrue(
+            "Failed to wait for DynamicCacheChangeBatch message (destroy, step 1)",
+            waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 2, getTestTimeout()));
+
+        // Let's start and stop the cache once again to clean up ClusterCachesInfo, i.e.
+        // registeredCaches and markedForDeletionCaches will be cleaned,
+        // and therefore, the corresponding cache descriptor will be lost.
+        IgniteInternalFuture<?> startFut2 = GridTestUtils.runAsync(() -> {
+            try {
+                client.getOrCreateCache(DEFAULT_CACHE_NAME);
+            }
+            catch (CacheException e) {
+                throw new RuntimeException("Failed to create a new cache (step 2)", e);
+            }
+        });
+
+        assertTrue(
+            "Failed to wait for DynamicCacheChangeBatch message (create, step 2)",
+            waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 3, getTestTimeout()));
+
+        IgniteInternalFuture<?> stopFut2 = GridTestUtils.runAsync(() -> {
+            try {
+                client.destroyCache(DEFAULT_CACHE_NAME);
+            }
+            catch (CacheException e) {
+                throw new RuntimeException("Failed to destroy new cache (step 1)", e);
+            }
+        });
+
+        assertTrue(
+            "Failed to wait for DynamicCacheChangeBatch message (create, step 2)",
+            waitForCondition(() -> crd.context().discovery().topologyVersionEx().minorTopologyVersion() == 4, getTestTimeout()));
+
+        // Unblock the initial PME.
+        spi1.stopBlock();
+
+        startFut1.get();
+        stopFut1.get();
+        startFut2.get();
+        stopFut2.get();
+
+        assertNull("The '" + DEFAULT_CACHE_NAME + "' cache is not destroyed.", crd.cache(DEFAULT_CACHE_NAME));
+    }
 }