You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/01/29 08:00:37 UTC
[ignite] branch master updated: IGNITE-12557 Fix possible IgniteOOM
during cache destroy. - Fixes #7298.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e160c8f IGNITE-12557 Fix possible IgniteOOM during cache destroy. - Fixes #7298.
e160c8f is described below
commit e160c8f23185b92692c9cb72d8a1aaabf113b3eb
Author: Aleksei Scherbakov <as...@apache.org>
AuthorDate: Wed Jan 29 11:00:02 2020 +0300
IGNITE-12557 Fix possible IgniteOOM during cache destroy. - Fixes #7298.
Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
.../processors/cache/GridCacheProcessor.java | 11 +--
.../cache/IgniteCacheOffheapManagerImpl.java | 21 ++++-
.../preloader/GridDhtPartitionsExchangeFuture.java | 2 +-
.../cache/persistence/GridCacheOffheapManager.java | 30 +++---
...orFindAndDeleteGarbageInPersistenceClosure.java | 8 +-
.../IgnitePdsDestroyCacheAbstractTest.java | 45 ++++++---
.../persistence/IgnitePdsDestroyCacheTest.java | 101 +++++++++++++++++++++
7 files changed, 180 insertions(+), 38 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e64e03b..dcdcfbec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2744,15 +2744,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
- List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop = exchActions.cacheGroupsToStop().stream()
+ List<IgniteBiTuple<CacheGroupContext, Boolean>> grpsToStop = exchActions.cacheGroupsToStop().stream()
.filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
.map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
.collect(Collectors.toList());
- grpToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
+ grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
if (!exchActions.cacheStopRequests().isEmpty())
- removeOffheapListenerAfterCheckpoint(grpToStop);
+ removeOffheapListenerAfterCheckpoint(grpsToStop);
Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
.collect(Collectors.groupingBy(action -> action.descriptor().groupId()));
@@ -2769,7 +2769,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
gctx.preloader().pause();
try {
-
if (gctx != null) {
final String msg = "Failed to wait for topology update, cache group is stopping.";
@@ -2817,11 +2816,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IgniteException(msg, e);
}
- for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpToStop)
+ for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
stopCacheGroup(grp.get1().groupId());
if (!sharedCtx.kernalContext().clientNode())
- sharedCtx.database().onCacheGroupsStopped(grpToStop);
+ sharedCtx.database().onCacheGroupsStopped(grpsToStop);
if (exchActions.deactivate())
sharedCtx.deactivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0df7728..c5a42bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -156,6 +156,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
private final boolean failNodeOnPartitionInconsistency = Boolean.getBoolean(
IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY);
+ /** Batch size for cache removals during destroy. */
+ private static final int BATCH_SIZE = 1000;
+
/** */
protected GridCacheSharedContext ctx;
@@ -292,8 +295,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.sharedGroup()) {
assert cacheId != CU.UNDEFINED_CACHE_ID;
- for (CacheDataStore store : cacheDataStores())
+ for (CacheDataStore store : cacheDataStores()) {
store.clear(cacheId);
+ }
// Clear non-persistent pending tree if needed.
if (pendingEntries != null) {
@@ -2954,7 +2958,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCursor<? extends CacheDataRow> cur =
cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
+ int rmv = 0;
+
while (cur.next()) {
+ if (++rmv == BATCH_SIZE) {
+ ctx.database().checkpointReadUnlock();
+
+ rmv = 0;
+
+ ctx.database().checkpointReadLock();
+ }
+
CacheDataRow row = cur.get();
assert row.link() != 0 : row;
@@ -2980,6 +2994,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (ex != null)
throw new IgniteCheckedException("Fail destroy store", ex);
+
+ // Allow checkpointer to progress if a partition contains less than BATCH_SIZE keys.
+ ctx.database().checkpointReadUnlock();
+
+ ctx.database().checkpointReadLock();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 40055f6..667fa74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2328,7 +2328,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
comp.onDoneBeforeTopologyUnlock(this);
- // Create and destory caches and cache proxies.
+ // Create and destroy caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
cctx.kernalContext().authentication().onActivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 3867a63..b922c35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2590,35 +2590,31 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
+ assert ctx.database().checkpointLockIsHeldByThread();
+
CacheDataStore delegate0 = init0(true);
if (delegate0 == null)
return;
- ctx.database().checkpointReadLock();
- try {
- // Clear persistent pendingTree
- if (pendingTree != null) {
- PendingRow row = new PendingRow(cacheId);
+ // Clear persistent pendingTree
+ if (pendingTree != null) {
+ PendingRow row = new PendingRow(cacheId);
- GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
+ GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
- while (cursor.next()) {
- PendingRow row0 = cursor.get();
+ while (cursor.next()) {
+ PendingRow row0 = cursor.get();
- assert row0.link != 0 : row;
+ assert row0.link != 0 : row;
- boolean res = pendingTree.removex(row0);
+ boolean res = pendingTree.removex(row0);
- assert res;
- }
+ assert res;
}
-
- delegate0.clear(cacheId);
- }
- finally {
- ctx.database().checkpointReadUnlock();
}
+
+ delegate0.clear(cacheId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
index 722686e..43d8a08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorFindAndDeleteGarbageInPersistenceClosure.java
@@ -174,7 +174,13 @@ public class VisorFindAndDeleteGarbageInPersistenceClosure implements IgniteCall
assert groupContext != null;
for (Integer cacheId : e.getValue().keySet()) {
- groupContext.offheap().stopCache(cacheId, true);
+ groupContext.shared().database().checkpointReadLock();
+ try {
+ groupContext.offheap().stopCache(cacheId, true);
+ }
+ finally {
+ groupContext.shared().database().checkpointReadUnlock();
+ }
((GridCacheOffheapManager)
groupContext.offheap()).findAndCleanupLostIndexesForStoppedCache(cacheId);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
index 56042bf..f870fb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheAbstractTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.cache.persistence;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -31,6 +34,8 @@ import org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
/**
* Base class for {@link IgnitePdsDestroyCacheTest} and {@link IgnitePdsDestroyCacheWithoutCheckpointsTest}
*/
@@ -42,16 +47,18 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
protected static final int NODES = 3;
/** */
- private static final int NUM_OF_KEYS = 100;
+ private static final int NUM_OF_KEYS = 2000;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
return cfg.setDataStorageConfiguration(new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(200 * 1024 * 1024)
- .setPersistenceEnabled(true)));
+ .setWalSegmentSize(4 * 1024 * 1024)
+ .setPageSize(1024)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(200 * 1024 * 1024)
+ .setPersistenceEnabled(true)));
}
/** {@inheritDoc} */
@@ -72,13 +79,19 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
/**
* @param ignite Ignite.
+ * @param singlePart {@code True} to load data to single partition.
*/
- private void loadCaches(Ignite ignite) {
+ protected void loadCaches(Ignite ignite, boolean singlePart) {
+ final List<Integer> keys;
+
+ keys = singlePart ? partitionKeys(ignite.cache(cacheName(0)), 1, NUM_OF_KEYS, 0) :
+ IntStream.range(0, NUM_OF_KEYS).boxed().collect(Collectors.toList());
+
for (int i = 0; i < CACHES; i++) {
try (IgniteDataStreamer<Object, Object> s = ignite.dataStreamer(cacheName(i))) {
s.allowOverwrite(true);
- for (int j = 0; j < NUM_OF_KEYS; j++)
+ for (Integer j : keys)
s.addData(j, "cache: " + i + " data: " + j);
s.flush();
@@ -90,7 +103,7 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
* @param ignite Ignite.
*/
protected void checkDestroyCaches(Ignite ignite) throws Exception {
- loadCaches(ignite);
+ loadCaches(ignite, false);
log.warning("destroying caches....");
@@ -127,7 +140,7 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
* @param ignite Ignite instance.
*/
protected void checkDestroyCachesAbruptly(Ignite ignite) throws Exception {
- loadCaches(ignite);
+ loadCaches(ignite, false);
log.warning("Destroying caches");
@@ -176,13 +189,21 @@ public abstract class IgnitePdsDestroyCacheAbstractTest extends GridCommonAbstra
* @param ignite Ignite instance.
*/
protected void startGroupCachesDynamically(Ignite ignite) {
+ startGroupCachesDynamically(ignite, false);
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param loc {@code True} for local caches.
+ */
+ protected void startGroupCachesDynamically(Ignite ignite, boolean loc) {
List<CacheConfiguration> ccfg = new ArrayList<>(CACHES);
for (int i = 0; i < CACHES; i++)
- ccfg.add(new CacheConfiguration<>(cacheName(i))
- .setGroupName(i % 2 == 0 ? "grp-even" : "grp-odd")
- .setBackups(1)
- .setAffinity(new RendezvousAffinityFunction(false, 32)));
+ ccfg.add(new CacheConfiguration<>(cacheName(i)).setCacheMode(loc ? LOCAL : CacheMode.PARTITIONED)
+ .setGroupName(i % 2 == 0 ? "grp-even" : "grp-odd")
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32)));
ignite.createCaches(ccfg);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
index ed274ec..d5f7e33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
@@ -17,9 +17,20 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Test correct clean up cache configuration data after destroying cache.
@@ -90,4 +101,94 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
checkDestroyCachesAbruptly(ignite);
}
+
+ /**
+ * Tests if a checkpoint is not blocked forever by concurrent cache destroying (DHT).
+ */
+ @Test
+ public void testDestroyCacheOperationNotBlockingCheckpointTest() throws Exception {
+ doTestDestroyCacheOperationNotBlockingCheckpointTest(false);
+ }
+
+ /**
+ * Tests if a checkpoint is not blocked forever by concurrent cache destroying (local).
+ */
+ @Test
+ public void testDestroyCacheOperationNotBlockingCheckpointTest_LocalCache() throws Exception {
+ doTestDestroyCacheOperationNotBlockingCheckpointTest(true);
+ }
+
+ /**
+ *
+ */
+ private void doTestDestroyCacheOperationNotBlockingCheckpointTest(boolean loc) throws Exception {
+ final IgniteEx ignite = startGrids(1);
+
+ ignite.cluster().active(true);
+
+ startGroupCachesDynamically(ignite, loc);
+
+ loadCaches(ignite, !loc);
+
+ // It's important to clear cache in group having > 1 caches.
+ final String cacheName = cacheName(0);
+ final CacheGroupContext grp = ignite.cachex(cacheName).context().group();
+
+ final IgniteCacheOffheapManager offheap = grp.offheap();
+
+ IgniteCacheOffheapManager mgr = Mockito.spy(offheap);
+
+ final CountDownLatch checkpointLocked = new CountDownLatch(1);
+ final CountDownLatch cpFutCreated = new CountDownLatch(1);
+ final CountDownLatch realMtdCalled = new CountDownLatch(1);
+ final CountDownLatch checked = new CountDownLatch(1);
+
+ Mockito.doAnswer(invocation -> {
+ checkpointLocked.countDown();
+
+ assertTrue(U.await(cpFutCreated, 30, TimeUnit.SECONDS));
+
+ Object ret = invocation.callRealMethod();
+
+ // After calling clearing code cp future must be eventually completed and cp read lock reacquired.
+ realMtdCalled.countDown();
+
+ // Wait for checkpoint future while holding lock.
+ U.awaitQuiet(checked);
+
+ return ret;
+ }).when(mgr).stopCache(Mockito.anyInt(), Mockito.anyBoolean());
+
+ final Field field = U.findField(CacheGroupContext.class, "offheapMgr");
+ field.set(grp, mgr);
+
+ final IgniteInternalFuture<Object> fut = runAsync(() -> {
+ assertTrue(U.await(checkpointLocked, 30, TimeUnit.SECONDS));
+
+ // Trigger checkpoint while holding checkpoint read lock on cache destroy.
+ final IgniteInternalFuture cpFut = ignite.context().cache().context().database().wakeupForCheckpoint("test");
+
+ assertFalse(cpFut.isDone());
+
+ cpFutCreated.countDown();
+
+ assertTrue(U.await(realMtdCalled, 30, TimeUnit.SECONDS));
+
+ try {
+ cpFut.get(3_000); // Future must be completed after cache clearing but before releasing checkpoint lock.
+ }
+ finally {
+ checked.countDown();
+ }
+
+ return null;
+ });
+
+ if (loc)
+ ignite.cache(cacheName).close();
+ else
+ ignite.destroyCache(cacheName);
+
+ fut.get();
+ }
}