You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/29 08:00:37 UTC

[ignite] branch master updated: IGNITE-12557 Fix possible IgniteOOM during cache destroy. - Fixes #7298.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e160c8f  IGNITE-12557 Fix possible IgniteOOM during cache destroy. - Fixes #7298.
e160c8f is described below

commit e160c8f23185b92692c9cb72d8a1aaabf113b3eb
Author: Aleksei Scherbakov <as...@apache.org>
AuthorDate: Wed Jan 29 11:00:02 2020 +0300

    IGNITE-12557 Fix possible IgniteOOM during cache destroy. - Fixes #7298.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../processors/cache/GridCacheProcessor.java       |  11 +--
 .../cache/IgniteCacheOffheapManagerImpl.java       |  21 ++++-
 .../preloader/GridDhtPartitionsExchangeFuture.java |   2 +-
 .../cache/persistence/GridCacheOffheapManager.java |  30 +++---
 ...orFindAndDeleteGarbageInPersistenceClosure.java |   8 +-
 .../IgnitePdsDestroyCacheAbstractTest.java         |  45 ++++++---
 .../persistence/IgnitePdsDestroyCacheTest.java     | 101 +++++++++++++++++++++
 7 files changed, 180 insertions(+), 38 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e64e03b..dcdcfbec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2744,15 +2744,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         // Reserve at least 2 threads for system operations.
         int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
 
-        List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop = exchActions.cacheGroupsToStop().stream()
+        List<IgniteBiTuple<CacheGroupContext, Boolean>> grpsToStop = exchActions.cacheGroupsToStop().stream()
             .filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
             .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
             .collect(Collectors.toList());
 
-        grpToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
+        grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
 
         if (!exchActions.cacheStopRequests().isEmpty())
-            removeOffheapListenerAfterCheckpoint(grpToStop);
+            removeOffheapListenerAfterCheckpoint(grpsToStop);
 
         Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
                 .collect(Collectors.groupingBy(action -> action.descriptor().groupId()));
@@ -2769,7 +2769,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             gctx.preloader().pause();
 
                         try {
-
                             if (gctx != null) {
                                 final String msg = "Failed to wait for topology update, cache group is stopping.";
 
@@ -2817,11 +2816,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             throw new IgniteException(msg, e);
         }
 
-        for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpToStop)
+        for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
             stopCacheGroup(grp.get1().groupId());
 
         if (!sharedCtx.kernalContext().clientNode())
-            sharedCtx.database().onCacheGroupsStopped(grpToStop);
+            sharedCtx.database().onCacheGroupsStopped(grpsToStop);
 
         if (exchActions.deactivate())
             sharedCtx.deactivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0df7728..c5a42bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -156,6 +156,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     private final boolean failNodeOnPartitionInconsistency = Boolean.getBoolean(
         IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY);
 
+    /** Batch size for cache removals during destroy. */
+    private static final int BATCH_SIZE = 1000;
+
     /** */
     protected GridCacheSharedContext ctx;
 
@@ -292,8 +295,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (grp.sharedGroup()) {
                 assert cacheId != CU.UNDEFINED_CACHE_ID;
 
-                for (CacheDataStore store : cacheDataStores())
+                for (CacheDataStore store : cacheDataStores()) {
                     store.clear(cacheId);
+                }
 
                 // Clear non-persistent pending tree if needed.
                 if (pendingEntries != null) {
@@ -2954,7 +2958,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             GridCursor<? extends CacheDataRow> cur =
                 cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
 
+            int rmv = 0;
+
             while (cur.next()) {
+                if (++rmv == BATCH_SIZE) {
+                    ctx.database().checkpointReadUnlock();
+
+                    rmv = 0;
+
+                    ctx.database().checkpointReadLock();
+                }
+
                 CacheDataRow row = cur.get();
 
                 assert row.link() != 0 : row;
@@ -2980,6 +2994,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             if (ex != null)
                 throw new IgniteCheckedException("Fail destroy store", ex);
+
+            // Allow checkpointer to progress if a partition contains less than BATCH_SIZE keys.
+            ctx.database().checkpointReadUnlock();
+
+            ctx.database().checkpointReadLock();
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 40055f6..667fa74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2328,7 +2328,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
                 comp.onDoneBeforeTopologyUnlock(this);
 
-            // Create and destory caches and cache proxies.
+            // Create and destroy caches and cache proxies.
             cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
             cctx.kernalContext().authentication().onActivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 3867a63..b922c35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2590,35 +2590,31 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
         /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate0 = init0(true);
 
             if (delegate0 == null)
                 return;
 
-            ctx.database().checkpointReadLock();
-            try {
-                // Clear persistent pendingTree
-                if (pendingTree != null) {
-                    PendingRow row = new PendingRow(cacheId);
+            // Clear persistent pendingTree
+            if (pendingTree != null) {
+                PendingRow row = new PendingRow(cacheId);
 
-                    GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
+                GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
 
-                    while (cursor.next()) {
-                        PendingRow row0 = cursor.get();
+                while (cursor.next()) {
+                    PendingRow row0 = cursor.get();
 
-                        assert row0.link != 0 : row;
+                    assert row0.link != 0 : row;
 
-                        boolean res = pendingTree.removex(row0);
+                    boolean res = pendingTree.removex(row0);
 
-                        assert res;
-                    }
+                    assert res;
                 }
-
-                delegate0.clear(cacheId);
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
             }
+
+            delegate0.clear(cacheId);
         }
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
index 722686e..43d8a08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
@@ -174,7 +174,13 @@ public class VisorFindAndDeleteGarbageInPersistenceClosure implements IgniteCall
             assert groupContext != null;
 
             for (Integer cacheId : e.getValue().keySet()) {
-                groupContext.offheap().stopCache(cacheId, true);
+                groupContext.shared().database().checkpointReadLock();
+                try {
+                    groupContext.offheap().stopCache(cacheId, true);
+                }
+                finally {
+                    groupContext.shared().database().checkpointReadUnlock();
+                }
 
                 ((GridCacheOffheapManager)
                     groupContext.offheap()).findAndCleanupLostIndexesForStoppedCache(cacheId);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
index 56042bf..f870fb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -31,6 +34,8 @@ import org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
 /**
  * Base class for  {@link IgnitePdsDestroyCacheTest} and {@link IgnitePdsDestroyCacheWithoutCheckpointsTest}
  */
@@ -42,16 +47,18 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
     protected static final int NODES = 3;
 
     /** */
-    private static final int NUM_OF_KEYS = 100;
+    private static final int NUM_OF_KEYS = 2000;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         return cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-                            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                                .setMaxSize(200 * 1024 * 1024)
-                                .setPersistenceEnabled(true)));
+            .setWalSegmentSize(4 * 1024 * 1024)
+            .setPageSize(1024)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200 * 1024 * 1024)
+                .setPersistenceEnabled(true)));
     }
 
     /** {@inheritDoc} */
@@ -72,13 +79,19 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
 
     /**
      * @param ignite Ignite.
+     * @param singlePart {@code True} to load data to single partition.
      */
-    private void loadCaches(Ignite ignite) {
+    protected void loadCaches(Ignite ignite, boolean singlePart) {
+        final List<Integer> keys;
+
+        keys = singlePart ? partitionKeys(ignite.cache(cacheName(0)), 1, NUM_OF_KEYS, 0) :
+            IntStream.range(0, NUM_OF_KEYS).boxed().collect(Collectors.toList());
+
         for (int i = 0; i < CACHES; i++) {
             try (IgniteDataStreamer<Object, Object> s = ignite.dataStreamer(cacheName(i))) {
                 s.allowOverwrite(true);
 
-                for (int j = 0; j < NUM_OF_KEYS; j++)
+                for (Integer j : keys)
                     s.addData(j, "cache: " + i + " data: " + j);
 
                 s.flush();
@@ -90,7 +103,7 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
      * @param ignite Ignite.
      */
     protected void checkDestroyCaches(Ignite ignite) throws Exception {
-        loadCaches(ignite);
+        loadCaches(ignite, false);
 
         log.warning("destroying caches....");
 
@@ -127,7 +140,7 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
      * @param ignite Ignite instance.
      */
     protected void checkDestroyCachesAbruptly(Ignite ignite) throws Exception {
-        loadCaches(ignite);
+        loadCaches(ignite, false);
 
         log.warning("Destroying caches");
 
@@ -176,13 +189,21 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
      * @param ignite Ignite instance.
      */
     protected void startGroupCachesDynamically(Ignite ignite) {
+        startGroupCachesDynamically(ignite, false);
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param loc {@code True} for local caches.
+     */
+    protected void startGroupCachesDynamically(Ignite ignite, boolean loc) {
         List<CacheConfiguration> ccfg = new ArrayList<>(CACHES);
 
         for (int i = 0; i < CACHES; i++)
-            ccfg.add(new CacheConfiguration<>(cacheName(i))
-                    .setGroupName(i % 2 == 0 ? "grp-even" : "grp-odd")
-                    .setBackups(1)
-                    .setAffinity(new RendezvousAffinityFunction(false, 32)));
+            ccfg.add(new CacheConfiguration<>(cacheName(i)).setCacheMode(loc ? LOCAL : CacheMode.PARTITIONED)
+                .setGroupName(i % 2 == 0 ? "grp-even" : "grp-odd")
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 32)));
 
         ignite.createCaches(ccfg);
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
index ed274ec..d5f7e33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
@@ -17,9 +17,20 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
 /**
  * Test correct clean up cache configuration data after destroying cache.
@@ -90,4 +101,94 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
 
         checkDestroyCachesAbruptly(ignite);
     }
+
+    /**
+     * Tests if a checkpoint is not blocked forever by concurrent cache destroying (DHT).
+     */
+    @Test
+    public void testDestroyCacheOperationNotBlockingCheckpointTest() throws Exception {
+        doTestDestroyCacheOperationNotBlockingCheckpointTest(false);
+    }
+
+    /**
+     * Tests if a checkpoint is not blocked forever by concurrent cache destroying (local).
+     */
+    @Test
+    public void testDestroyCacheOperationNotBlockingCheckpointTest_LocalCache() throws Exception {
+        doTestDestroyCacheOperationNotBlockingCheckpointTest(true);
+    }
+
+    /**
+     *
+     */
+    private void doTestDestroyCacheOperationNotBlockingCheckpointTest(boolean loc) throws Exception {
+        final IgniteEx ignite = startGrids(1);
+
+        ignite.cluster().active(true);
+
+        startGroupCachesDynamically(ignite, loc);
+
+        loadCaches(ignite, !loc);
+
+        // It's important to clear cache in group having > 1 caches.
+        final String cacheName = cacheName(0);
+        final CacheGroupContext grp = ignite.cachex(cacheName).context().group();
+
+        final IgniteCacheOffheapManager offheap = grp.offheap();
+
+        IgniteCacheOffheapManager mgr = Mockito.spy(offheap);
+
+        final CountDownLatch checkpointLocked = new CountDownLatch(1);
+        final CountDownLatch cpFutCreated = new CountDownLatch(1);
+        final CountDownLatch realMtdCalled = new CountDownLatch(1);
+        final CountDownLatch checked = new CountDownLatch(1);
+
+        Mockito.doAnswer(invocation -> {
+            checkpointLocked.countDown();
+
+            assertTrue(U.await(cpFutCreated, 30, TimeUnit.SECONDS));
+
+            Object ret = invocation.callRealMethod();
+
+            // After calling clearing code cp future must be eventually completed and cp read lock reacquired.
+            realMtdCalled.countDown();
+
+            // Wait for checkpoint future while holding lock.
+            U.awaitQuiet(checked);
+
+            return ret;
+        }).when(mgr).stopCache(Mockito.anyInt(), Mockito.anyBoolean());
+
+        final Field field = U.findField(CacheGroupContext.class, "offheapMgr");
+        field.set(grp, mgr);
+
+        final IgniteInternalFuture<Object> fut = runAsync(() -> {
+            assertTrue(U.await(checkpointLocked, 30, TimeUnit.SECONDS));
+
+            // Trigger checkpoint while holding checkpoint read lock on cache destroy.
+            final IgniteInternalFuture cpFut = ignite.context().cache().context().database().wakeupForCheckpoint("test");
+
+            assertFalse(cpFut.isDone());
+
+            cpFutCreated.countDown();
+
+            assertTrue(U.await(realMtdCalled, 30, TimeUnit.SECONDS));
+
+            try {
+                cpFut.get(3_000); // Future must be completed after cache clearing but before releasing checkpoint lock.
+            }
+            finally {
+                checked.countDown();
+            }
+
+            return null;
+        });
+
+        if (loc)
+            ignite.cache(cacheName).close();
+        else
+            ignite.destroyCache(cacheName);
+
+        fut.get();
+    }
 }