You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/07/28 08:32:02 UTC
[ignite] branch master updated: IGNITE-13295 Fixed AssertionError
on expiration of cache entries,
that relate to a non-persistent data region in the persistent cluster.
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 33bda0e IGNITE-13295 Fixed AssertionError on expiration of cache entries, that relate to a non-persistent data region in the persistent cluster.
33bda0e is described below
commit 33bda0e51ea35d3eb47d48d722d5436bbc91196f
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue Jul 28 11:31:29 2020 +0300
IGNITE-13295 Fixed AssertionError on expiration of cache entries, that relate to a non-persistent data region in the persistent cluster.
---
.../cache/IgniteCacheOffheapManagerImpl.java | 66 ++---
.../IgnitePdsWithTtlDeactivateOnHighloadTest.java | 291 --------------------
.../cache/persistence/db/IgnitePdsWithTtlTest.java | 293 ++++++++++++++++++---
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 -
4 files changed, 289 insertions(+), 364 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8b8f587..fd0d9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -295,9 +295,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.sharedGroup()) {
assert cacheId != CU.UNDEFINED_CACHE_ID;
- for (CacheDataStore store : cacheDataStores()) {
+ for (CacheDataStore store : cacheDataStores())
store.clear(cacheId);
- }
// Clear non-persistent pending tree if needed.
if (pendingEntries != null) {
@@ -1387,49 +1386,56 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCursor<PendingRow> cur;
- if (grp.sharedGroup())
- cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
- else
- cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+ cctx.shared().database().checkpointReadLock();
- if (!cur.next())
- return 0;
+ try {
+ if (grp.sharedGroup())
+ cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+ else
+ cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
- if (!busyLock.enterBusy())
- return 0;
+ if (!cur.next())
+ return 0;
- try {
- int cleared = 0;
+ if (!busyLock.enterBusy())
+ return 0;
- do {
- if (amount != -1 && cleared > amount)
- return cleared;
+ try {
+ int cleared = 0;
- PendingRow row = cur.get();
+ do {
+ if (amount != -1 && cleared > amount)
+ return cleared;
- if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
+ PendingRow row = cur.get();
- assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+ if (row.key.partition() == -1)
+ row.key.partition(cctx.affinity().partition(row.key));
- if (pendingEntries.removex(row)) {
- if (obsoleteVer == null)
- obsoleteVer = cctx.cache().nextVersion();
+ assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
- GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+ if (pendingEntries.removex(row)) {
+ if (obsoleteVer == null)
+ obsoleteVer = cctx.cache().nextVersion();
+
+ GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+
+ if (entry != null)
+ c.apply(entry, obsoleteVer);
+ }
- if (entry != null)
- c.apply(entry, obsoleteVer);
+ cleared++;
}
+ while (cur.next());
- cleared++;
+ return cleared;
+ }
+ finally {
+ busyLock.leaveBusy();
}
- while (cur.next());
-
- return cleared;
}
finally {
- busyLock.leaveBusy();
+ cctx.shared().database().checkpointReadUnlock();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
deleted file mode 100644
index 47a5779..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-package org.apache.ignite.internal.processors.cache.persistence.db;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.expiry.AccessedExpiryPolicy;
-import javax.cache.expiry.Duration;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterState;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
-import org.apache.ignite.failure.NoOpFailureHandler;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.MvccFeatureChecker;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
-import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
-
-/**
- * Test TTL worker with persistence enabled
- */
-@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
-public class IgnitePdsWithTtlDeactivateOnHighloadTest extends GridCommonAbstractTest {
- /** */
- private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
-
- /** */
- private static final String CACHE_NAME_TX = "expirable-cache-tx";
-
- /** */
- private static final String CACHE_NAME_LOCAL_ATOMIC = "expirable-cache-local-atomic";
-
- /** */
- private static final String CACHE_NAME_LOCAL_TX = "expirable-cache-local-tx";
-
- /** */
- private static final String CACHE_NAME_NEAR_ATOMIC = "expirable-cache-near-atomic";
-
- /** */
- private static final String CACHE_NAME_NEAR_TX = "expirable-cache-near-tx";
-
- /** */
- private static final int PART_SIZE = 2;
-
- /** */
- private static final int EXPIRATION_TIMEOUT = 10;
-
- /** */
- private static final int ENTRIES = 10;
-
- /** */
- private static final int WORKLOAD_THREADS_CNT = 16;
-
- /** Fail. */
- private volatile boolean fail;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
-
- super.beforeTest();
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
-
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setDataStorageConfiguration(
- new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setMaxSize(2L * 1024 * 1024 * 1024)
- .setPersistenceEnabled(true)
- ).setWalMode(WALMode.LOG_ONLY));
-
- cfg.setCacheConfiguration(
- getCacheConfiguration(CACHE_NAME_ATOMIC).setAtomicityMode(ATOMIC),
- getCacheConfiguration(CACHE_NAME_TX).setAtomicityMode(TRANSACTIONAL),
- getCacheConfiguration(CACHE_NAME_LOCAL_ATOMIC).setAtomicityMode(ATOMIC).setCacheMode(CacheMode.LOCAL),
- getCacheConfiguration(CACHE_NAME_LOCAL_TX).setAtomicityMode(TRANSACTIONAL).setCacheMode(CacheMode.LOCAL),
- getCacheConfiguration(CACHE_NAME_NEAR_ATOMIC).setAtomicityMode(ATOMIC)
- .setNearConfiguration(new NearCacheConfiguration<>()),
- getCacheConfiguration(CACHE_NAME_NEAR_TX).setAtomicityMode(TRANSACTIONAL)
- .setNearConfiguration(new NearCacheConfiguration<>())
- );
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
- return new NoOpFailureHandler() {
- @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
- fail = true;
-
- return super.handle(ignite, failureCtx);
- }
- };
- }
-
- /**
- * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
- *
- * @param name Cache name.
- * @return Cache configuration.
- */
- private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
- CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
-
- ccfg.setName(name);
- ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
- ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
- ccfg.setEagerTtl(true);
- ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-
- return ccfg;
- }
-
- /**
- * @throws Exception if failed.
- */
- @Test
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- public void shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws Exception {
- IgniteEx ig0 = startGrid(0);
-
- ig0.cluster().state(ClusterState.ACTIVE);
-
- IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME_ATOMIC);
-
- AtomicBoolean timeoutReached = new AtomicBoolean(false);
-
- GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
-
- IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
- while (!timeoutReached.get()) {
- Map<Object, Object> map = new TreeMap<>();
-
- for (int i = 0; i < ENTRIES; i++)
- map.put(i, i);
-
- cache.putAll(map);
- }
- }, WORKLOAD_THREADS_CNT, "loader");
-
- IgniteInternalFuture<?> updaterFut = runMultiThreadedAsync(() -> {
- while (!timeoutReached.get()) {
- for (int i = 0; i < ENTRIES; i++)
- cache.put(i, i * 10);
- }
- }, WORKLOAD_THREADS_CNT, "updater");
-
- IgniteInternalFuture<?> cpWriteLockUnlockFut = runAsync(() -> {
- ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
-
- while (!timeoutReached.get()) {
- try {
- lock.writeLock().lockInterruptibly();
-
- doSleep(30);
- }
- catch (InterruptedException ignored) {
- break;
- }
- finally {
- lock.writeLock().unlock();
- }
-
- doSleep(30);
- }
- }, "cp-write-lock-holder");
-
- doSleep(10_000);
-
- timeoutReached.set(true);
-
- waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
- }
-
- /**
- * @throws Exception if failed.
- */
- @Test
- public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws Exception {
- final AtomicBoolean end = new AtomicBoolean();
-
- final IgniteEx srv = startGrids(3);
-
- srv.cluster().state(ClusterState.ACTIVE);
-
- // Start high workload.
- IgniteInternalFuture<?> loadFut = runMultiThreadedAsync(() -> {
- List<IgniteCache<Object, Object>> caches = F.asList(
- srv.cache(CACHE_NAME_ATOMIC),
- srv.cache(CACHE_NAME_TX),
- srv.cache(CACHE_NAME_LOCAL_ATOMIC),
- srv.cache(CACHE_NAME_LOCAL_TX),
- srv.cache(CACHE_NAME_NEAR_ATOMIC),
- srv.cache(CACHE_NAME_NEAR_TX)
- );
-
- while (!end.get() && !fail) {
- for (IgniteCache<Object, Object> cache : caches) {
- for (int i = 0; i < ENTRIES; i++)
- cache.put(i, new byte[1024]);
-
- cache.putAll(new TreeMap<>(F.asMap(0, new byte[1024], 1, new byte[1024])));
-
- for (int i = 0; i < ENTRIES; i++)
- cache.get(i);
-
- cache.getAll(new TreeSet<>(F.asList(0, 1)));
- }
- }
- }, WORKLOAD_THREADS_CNT, "high-workload");
-
- try {
- // Let's wait some time.
- loadFut.get(10, TimeUnit.SECONDS);
- }
- catch (Exception e) {
- assertFalse("Failure handler was called. See log above.", fail);
-
- assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
- }
- finally {
- end.set(true);
- }
-
- assertFalse("Failure handler was called. See log above.", fail);
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index 46f1423..857aadf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -17,15 +17,22 @@
package org.apache.ignite.internal.processors.cache.persistence.db;
+import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.expiry.AccessedExpiryPolicy;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -34,36 +41,72 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
+
/**
* Test TTL worker with persistence enabled
*/
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
/** */
- public static final String CACHE_NAME = "expirableCache";
+ private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
+
+ /** */
+ private static final String CACHE_NAME_ATOMIC_NON_PERSISTENT = "expirable-non-persistent-cache-atomic";
+
+ /** */
+ private static final String CACHE_NAME_TX = "expirable-cache-tx";
+
+ /** */
+ private static final String CACHE_NAME_LOCAL_ATOMIC = "expirable-cache-local-atomic";
+
+ /** */
+ private static final String CACHE_NAME_LOCAL_TX = "expirable-cache-local-tx";
+
+ /** */
+ private static final String CACHE_NAME_NEAR_ATOMIC = "expirable-cache-near-atomic";
+
+ /** */
+ private static final String CACHE_NAME_NEAR_TX = "expirable-cache-near-tx";
/** */
- public static final String GROUP_NAME = "group1";
+ private static final String NON_PERSISTENT_DATA_REGION = "non-persistent-region";
/** */
- public static final int PART_SIZE = 32;
+ public static final int PART_SIZE = 2;
/** */
private static final int EXPIRATION_TIMEOUT = 10;
@@ -71,15 +114,14 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
/** */
public static final int ENTRIES = 50_000;
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
- }
+ /** */
+ public static final int SMALL_ENTRIES = 10;
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
- }
+ /** */
+ private static final int WORKLOAD_THREADS_CNT = 16;
+
+ /** Fail. */
+ private volatile boolean failureHndTriggered;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
@@ -104,31 +146,58 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ DataRegionConfiguration dfltRegion = new DataRegionConfiguration()
+ .setMaxSize(2L * 1024 * 1024 * 1024)
+ .setPersistenceEnabled(true);
+
+ DataRegionConfiguration nonPersistentRegion = new DataRegionConfiguration()
+ .setName(NON_PERSISTENT_DATA_REGION)
+ .setMaxSize(2L * 1024 * 1024 * 1024)
+ .setPersistenceEnabled(false);
+
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setMaxSize(2L * 1024 * 1024 * 1024)
- .setPersistenceEnabled(true)
- ).setWalMode(WALMode.LOG_ONLY));
-
- cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
+ .setDefaultDataRegionConfiguration(dfltRegion)
+ .setDataRegionConfigurations(nonPersistentRegion)
+ .setWalMode(WALMode.LOG_ONLY));
+
+ cfg.setCacheConfiguration(
+ getCacheConfiguration(CACHE_NAME_ATOMIC).setAtomicityMode(ATOMIC),
+ getCacheConfiguration(CACHE_NAME_TX).setAtomicityMode(TRANSACTIONAL),
+ getCacheConfiguration(CACHE_NAME_LOCAL_ATOMIC).setAtomicityMode(ATOMIC).setCacheMode(CacheMode.LOCAL),
+ getCacheConfiguration(CACHE_NAME_LOCAL_TX).setAtomicityMode(TRANSACTIONAL).setCacheMode(CacheMode.LOCAL),
+ getCacheConfiguration(CACHE_NAME_NEAR_ATOMIC).setAtomicityMode(ATOMIC)
+ .setNearConfiguration(new NearCacheConfiguration<>()),
+ getCacheConfiguration(CACHE_NAME_NEAR_TX).setAtomicityMode(TRANSACTIONAL)
+ .setNearConfiguration(new NearCacheConfiguration<>())
+ );
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new NoOpFailureHandler() {
+ @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+ failureHndTriggered = true;
+
+ return super.handle(ignite, failureCtx);
+ }
+ };
+ }
+
/**
* Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
+ *
* @param name Cache name.
* @return Cache configuration.
*/
- private CacheConfiguration getCacheConfiguration(String name) {
- CacheConfiguration ccfg = new CacheConfiguration();
+ private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
ccfg.setName(name);
- ccfg.setGroupName(GROUP_NAME);
ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
- ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+ ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
ccfg.setEagerTtl(true);
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
@@ -150,23 +219,23 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
@Test
public void testTtlIsAppliedForMultipleCaches() throws Exception {
IgniteEx srv = startGrid(0);
- srv.cluster().active(true);
+ srv.cluster().state(ACTIVE);
int cacheCnt = 2;
// Create a new caches in the same group.
// It is important that initially created cache CACHE_NAME remains empty.
for (int i = 0; i < cacheCnt; ++i) {
- String cacheName = CACHE_NAME + "-" + i;
+ String cacheName = CACHE_NAME_ATOMIC + "-" + i;
srv.getOrCreateCache(getCacheConfiguration(cacheName));
fillCache(srv.cache(cacheName));
}
- waitAndCheckExpired(srv, srv.cache(CACHE_NAME + "-" + (cacheCnt - 1)));
+ waitAndCheckExpired(srv, srv.cache(CACHE_NAME_ATOMIC + "-" + (cacheCnt - 1)));
- srv.cluster().active(false);
+ srv.cluster().state(ACTIVE);
stopAllGrids();
}
@@ -182,27 +251,141 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
/**
* @throws Exception if failed.
*/
+ @Test
+ public void testPutOpsIntoCacheWithExpirationConcurrentlyWithCheckpointCompleteSuccessfully() throws Exception {
+ IgniteEx ig0 = startGrid(0);
+
+ ig0.cluster().state(ACTIVE);
+
+ IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME_ATOMIC);
+
+ AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+ IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
+ while (!timeoutReached.get()) {
+ Map<Object, Object> map = new TreeMap<>();
+
+ for (int i = 0; i < ENTRIES; i++)
+ map.put(i, i);
+
+ cache.putAll(map);
+ }
+ }, WORKLOAD_THREADS_CNT, "loader");
+
+ IgniteInternalFuture<?> updaterFut = runMultiThreadedAsync(() -> {
+ while (!timeoutReached.get()) {
+ for (int i = 0; i < SMALL_ENTRIES; i++)
+ cache.put(i, i * 10);
+ }
+ }, WORKLOAD_THREADS_CNT, "updater");
+
+ IgniteInternalFuture<?> cpWriteLockUnlockFut = runAsync(() -> {
+ ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
+
+ while (!timeoutReached.get()) {
+ try {
+ lock.writeLock().lockInterruptibly();
+
+ doSleep(30);
+ }
+ catch (InterruptedException ignored) {
+ break;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ doSleep(30);
+ }
+ }, "cp-write-lock-holder");
+
+ doSleep(10_000);
+
+ timeoutReached.set(true);
+
+ waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testConcurrentPutOpsToCacheWithExpirationCompleteSuccesfully() throws Exception {
+ final AtomicBoolean end = new AtomicBoolean();
+
+ final IgniteEx srv = startGrids(3);
+
+ srv.cluster().state(ACTIVE);
+
+ // Start high workload.
+ IgniteInternalFuture<?> loadFut = runMultiThreadedAsync(() -> {
+ List<IgniteCache<Object, Object>> caches = F.asList(
+ srv.cache(CACHE_NAME_ATOMIC),
+ srv.cache(CACHE_NAME_TX),
+ srv.cache(CACHE_NAME_LOCAL_ATOMIC),
+ srv.cache(CACHE_NAME_LOCAL_TX),
+ srv.cache(CACHE_NAME_NEAR_ATOMIC),
+ srv.cache(CACHE_NAME_NEAR_TX)
+ );
+
+ while (!end.get() && !failureHndTriggered) {
+ for (IgniteCache<Object, Object> cache : caches) {
+ for (int i = 0; i < SMALL_ENTRIES; i++)
+ cache.put(i, new byte[1024]);
+
+ cache.putAll(new TreeMap<>(F.asMap(0, new byte[1024], 1, new byte[1024])));
+
+ for (int i = 0; i < SMALL_ENTRIES; i++)
+ cache.get(i);
+
+ cache.getAll(new TreeSet<>(F.asList(0, 1)));
+ }
+ }
+ }, WORKLOAD_THREADS_CNT, "high-workload");
+
+ try {
+ // Let's wait some time.
+ loadFut.get(10, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ assertFalse("Failure handler was called. See log above.", failureHndTriggered);
+
+ assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
+ }
+ finally {
+ end.set(true);
+ }
+
+ assertFalse("Failure handler was called. See log above.", failureHndTriggered);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
private void loadAndWaitForCleanup(boolean restartGrid) throws Exception {
IgniteEx srv = startGrid(0);
- srv.cluster().active(true);
- fillCache(srv.cache(CACHE_NAME));
+ srv.cluster().state(ACTIVE);
+
+ fillCache(srv.cache(CACHE_NAME_ATOMIC));
if (restartGrid) {
srv.context().cache().context().database().waitForCheckpoint("test-checkpoint");
stopGrid(0);
srv = startGrid(0);
- srv.cluster().active(true);
+ srv.cluster().state(ACTIVE);
}
- final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+ final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
printStatistics((IgniteCacheProxy)cache, "After restart from LFS");
waitAndCheckExpired(srv, cache);
- srv.cluster().active(false);
+ srv.cluster().state(ACTIVE);
stopAllGrids();
}
@@ -215,22 +398,22 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
IgniteEx srv = startGrid(0);
srv.cluster().baselineAutoAdjustEnabled(false);
- srv.cluster().active(true);
+ srv.cluster().state(ACTIVE);
- fillCache(srv.cache(CACHE_NAME));
+ fillCache(srv.cache(CACHE_NAME_ATOMIC));
srv = startGrid(1);
//causes rebalancing start
srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
- final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+ final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
printStatistics((IgniteCacheProxy)cache, "After rebalancing start");
waitAndCheckExpired(srv, cache);
- srv.cluster().active(false);
+ srv.cluster().state(INACTIVE);
stopAllGrids();
}
@@ -250,13 +433,13 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
ExpiryPolicy plc = CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY).create();
- IgniteCache<Integer, byte[]> cache0 = srv.cache(CACHE_NAME);
+ IgniteCache<Integer, byte[]> cache0 = srv.cache(CACHE_NAME_ATOMIC);
fillCache(cache0.withExpiryPolicy(plc));
srv = startGrid(2);
- IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+ IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
//causes rebalancing start
srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
@@ -274,13 +457,43 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
stopGrid(1);
startGrid(1);
- srv.cluster().active(false);
+ srv.cluster().state(INACTIVE);
}
finally {
stopAllGrids();
}
}
+ /**
+ * Tests that cache entries (cache related to non persistent region) correctly expired.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testExpirationNonPersistentRegion() throws Exception {
+ IgniteEx srv = startGrid(0);
+
+ srv.cluster().baselineAutoAdjustEnabled(false);
+ srv.cluster().state(ACTIVE);
+
+ CacheConfiguration<?, ?> cfg =
+ getCacheConfiguration(CACHE_NAME_ATOMIC_NON_PERSISTENT)
+ .setAtomicityMode(ATOMIC)
+ .setDataRegionName(NON_PERSISTENT_DATA_REGION);
+
+ srv.getOrCreateCache(cfg);
+
+ IgniteCache<Integer, byte[]> nonPersistentCache = srv.cache(CACHE_NAME_ATOMIC_NON_PERSISTENT);
+
+ fillCache(nonPersistentCache);
+
+ waitAndCheckExpired(srv, nonPersistentCache);
+
+ stopAllGrids();
+
+ assertFalse("Failure handler should not be triggered.", failureHndTriggered);
+ }
+
/** */
protected void fillCache(IgniteCache<Integer, byte[]> cache) {
cache.putAll(new TreeMap<Integer, byte[]>() {{
@@ -311,7 +524,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
printStatistics((IgniteCacheProxy)cache, "After timeout");
GridCacheSharedContext ctx = srv.context().cache().context();
- GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME));
+ GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME_ATOMIC));
// Check partitions through internal API.
for (int partId = 0; partId < PART_SIZE; ++partId) {
@@ -321,7 +534,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
continue;
IgniteCacheOffheapManager.CacheDataStore dataStore =
- ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart);
+ ctx.cache().cacheGroup(CU.cacheId(CACHE_NAME_ATOMIC)).offheap().dataStore(locPart);
GridCursor cur = dataStore.cursor();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 2f065c4..d930b3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlDeactivateOnHighloadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
@@ -257,8 +256,6 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, IgnitePdsPartitionsStateRecoveryTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
-
GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);