You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/07/28 08:32:02 UTC

[ignite] branch master updated: IGNITE-13295 Fixed AssertionError on expiration of cache entries, that relate to a non-persistent data region in the persistent cluster.

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 33bda0e  IGNITE-13295 Fixed AssertionError on expiration of cache entries, that relate to a non-persistent data region in the persistent cluster.
33bda0e is described below

commit 33bda0e51ea35d3eb47d48d722d5436bbc91196f
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue Jul 28 11:31:29 2020 +0300

    IGNITE-13295 Fixed AssertionError on expiration of cache entries, that relate to a non-persistent data region in the persistent cluster.
---
 .../cache/IgniteCacheOffheapManagerImpl.java       |  66 ++---
 .../IgnitePdsWithTtlDeactivateOnHighloadTest.java  | 291 --------------------
 .../cache/persistence/db/IgnitePdsWithTtlTest.java | 293 ++++++++++++++++++---
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 -
 4 files changed, 289 insertions(+), 364 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8b8f587..fd0d9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -295,9 +295,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (grp.sharedGroup()) {
                 assert cacheId != CU.UNDEFINED_CACHE_ID;
 
-                for (CacheDataStore store : cacheDataStores()) {
+                for (CacheDataStore store : cacheDataStores())
                     store.clear(cacheId);
-                }
 
                 // Clear non-persistent pending tree if needed.
                 if (pendingEntries != null) {
@@ -1387,49 +1386,56 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
         GridCursor<PendingRow> cur;
 
-        if (grp.sharedGroup())
-            cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
-        else
-            cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+        cctx.shared().database().checkpointReadLock();
 
-        if (!cur.next())
-            return 0;
+        try {
+            if (grp.sharedGroup())
+                cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+            else
+                cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
 
-        if (!busyLock.enterBusy())
-            return 0;
+            if (!cur.next())
+                return 0;
 
-        try {
-            int cleared = 0;
+            if (!busyLock.enterBusy())
+                return 0;
 
-            do {
-                if (amount != -1 && cleared > amount)
-                    return cleared;
+            try {
+                int cleared = 0;
 
-                PendingRow row = cur.get();
+                do {
+                    if (amount != -1 && cleared > amount)
+                        return cleared;
 
-                if (row.key.partition() == -1)
-                    row.key.partition(cctx.affinity().partition(row.key));
+                    PendingRow row = cur.get();
 
-                assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+                    if (row.key.partition() == -1)
+                        row.key.partition(cctx.affinity().partition(row.key));
 
-                if (pendingEntries.removex(row)) {
-                    if (obsoleteVer == null)
-                        obsoleteVer = cctx.cache().nextVersion();
+                    assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
 
-                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+                    if (pendingEntries.removex(row)) {
+                        if (obsoleteVer == null)
+                            obsoleteVer = cctx.cache().nextVersion();
+
+                        GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
+
+                        if (entry != null)
+                            c.apply(entry, obsoleteVer);
+                    }
 
-                    if (entry != null)
-                        c.apply(entry, obsoleteVer);
+                    cleared++;
                 }
+                while (cur.next());
 
-                cleared++;
+                return cleared;
+            }
+            finally {
+                busyLock.leaveBusy();
             }
-            while (cur.next());
-
-            return cleared;
         }
         finally {
-            busyLock.leaveBusy();
+            cctx.shared().database().checkpointReadUnlock();
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
deleted file mode 100644
index 47a5779..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-package org.apache.ignite.internal.processors.cache.persistence.db;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.expiry.AccessedExpiryPolicy;
-import javax.cache.expiry.Duration;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterState;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.failure.FailureContext;
-import org.apache.ignite.failure.FailureHandler;
-import org.apache.ignite.failure.NoOpFailureHandler;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.MvccFeatureChecker;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
-import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
-
-/**
- * Test TTL worker with persistence enabled
- */
-@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
-public class IgnitePdsWithTtlDeactivateOnHighloadTest extends GridCommonAbstractTest {
-    /** */
-    private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
-
-    /** */
-    private static final String CACHE_NAME_TX = "expirable-cache-tx";
-
-    /** */
-    private static final String CACHE_NAME_LOCAL_ATOMIC = "expirable-cache-local-atomic";
-
-    /** */
-    private static final String CACHE_NAME_LOCAL_TX = "expirable-cache-local-tx";
-
-    /** */
-    private static final String CACHE_NAME_NEAR_ATOMIC = "expirable-cache-near-atomic";
-
-    /** */
-    private static final String CACHE_NAME_NEAR_TX = "expirable-cache-near-tx";
-
-    /** */
-    private static final int PART_SIZE = 2;
-
-    /** */
-    private static final int EXPIRATION_TIMEOUT = 10;
-
-    /** */
-    private static final int ENTRIES = 10;
-
-    /** */
-    private static final int WORKLOAD_THREADS_CNT = 16;
-
-    /** Fail. */
-    private volatile boolean fail;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
-
-        super.beforeTest();
-
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setDataStorageConfiguration(
-            new DataStorageConfiguration()
-                .setDefaultDataRegionConfiguration(
-                    new DataRegionConfiguration()
-                        .setMaxSize(2L * 1024 * 1024 * 1024)
-                        .setPersistenceEnabled(true)
-                ).setWalMode(WALMode.LOG_ONLY));
-
-        cfg.setCacheConfiguration(
-            getCacheConfiguration(CACHE_NAME_ATOMIC).setAtomicityMode(ATOMIC),
-            getCacheConfiguration(CACHE_NAME_TX).setAtomicityMode(TRANSACTIONAL),
-            getCacheConfiguration(CACHE_NAME_LOCAL_ATOMIC).setAtomicityMode(ATOMIC).setCacheMode(CacheMode.LOCAL),
-            getCacheConfiguration(CACHE_NAME_LOCAL_TX).setAtomicityMode(TRANSACTIONAL).setCacheMode(CacheMode.LOCAL),
-            getCacheConfiguration(CACHE_NAME_NEAR_ATOMIC).setAtomicityMode(ATOMIC)
-                .setNearConfiguration(new NearCacheConfiguration<>()),
-            getCacheConfiguration(CACHE_NAME_NEAR_TX).setAtomicityMode(TRANSACTIONAL)
-                .setNearConfiguration(new NearCacheConfiguration<>())
-        );
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
-        return new NoOpFailureHandler() {
-            @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
-                fail = true;
-
-                return super.handle(ignite, failureCtx);
-            }
-        };
-    }
-
-    /**
-     * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
-     *
-     * @param name Cache name.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
-        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
-
-        ccfg.setName(name);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
-        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
-        ccfg.setEagerTtl(true);
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-
-        return ccfg;
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    @Test
-    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    public void shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws Exception {
-        IgniteEx ig0 = startGrid(0);
-
-        ig0.cluster().state(ClusterState.ACTIVE);
-
-        IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME_ATOMIC);
-
-        AtomicBoolean timeoutReached = new AtomicBoolean(false);
-
-        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
-
-        IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
-            while (!timeoutReached.get()) {
-                Map<Object, Object> map = new TreeMap<>();
-
-                for (int i = 0; i < ENTRIES; i++)
-                    map.put(i, i);
-
-                cache.putAll(map);
-            }
-        }, WORKLOAD_THREADS_CNT, "loader");
-
-        IgniteInternalFuture<?> updaterFut = runMultiThreadedAsync(() -> {
-            while (!timeoutReached.get()) {
-                for (int i = 0; i < ENTRIES; i++)
-                    cache.put(i, i * 10);
-            }
-        }, WORKLOAD_THREADS_CNT, "updater");
-
-        IgniteInternalFuture<?> cpWriteLockUnlockFut = runAsync(() -> {
-            ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
-
-            while (!timeoutReached.get()) {
-                try {
-                    lock.writeLock().lockInterruptibly();
-
-                    doSleep(30);
-                }
-                catch (InterruptedException ignored) {
-                    break;
-                }
-                finally {
-                    lock.writeLock().unlock();
-                }
-
-                doSleep(30);
-            }
-        }, "cp-write-lock-holder");
-
-        doSleep(10_000);
-
-        timeoutReached.set(true);
-
-        waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    @Test
-    public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws Exception {
-        final AtomicBoolean end = new AtomicBoolean();
-
-        final IgniteEx srv = startGrids(3);
-
-        srv.cluster().state(ClusterState.ACTIVE);
-
-        // Start high workload.
-        IgniteInternalFuture<?> loadFut = runMultiThreadedAsync(() -> {
-            List<IgniteCache<Object, Object>> caches = F.asList(
-                srv.cache(CACHE_NAME_ATOMIC),
-                srv.cache(CACHE_NAME_TX),
-                srv.cache(CACHE_NAME_LOCAL_ATOMIC),
-                srv.cache(CACHE_NAME_LOCAL_TX),
-                srv.cache(CACHE_NAME_NEAR_ATOMIC),
-                srv.cache(CACHE_NAME_NEAR_TX)
-            );
-
-            while (!end.get() && !fail) {
-                for (IgniteCache<Object, Object> cache : caches) {
-                    for (int i = 0; i < ENTRIES; i++)
-                        cache.put(i, new byte[1024]);
-
-                    cache.putAll(new TreeMap<>(F.asMap(0, new byte[1024], 1, new byte[1024])));
-
-                    for (int i = 0; i < ENTRIES; i++)
-                        cache.get(i);
-
-                    cache.getAll(new TreeSet<>(F.asList(0, 1)));
-                }
-            }
-        }, WORKLOAD_THREADS_CNT, "high-workload");
-
-        try {
-            // Let's wait some time.
-            loadFut.get(10, TimeUnit.SECONDS);
-        }
-        catch (Exception e) {
-            assertFalse("Failure handler was called. See log above.", fail);
-
-            assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
-        }
-        finally {
-            end.set(true);
-        }
-
-        assertFalse("Failure handler was called. See log above.", fail);
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index 46f1423..857aadf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -17,15 +17,22 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db;
 
+import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.cache.expiry.AccessedExpiryPolicy;
 import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -34,36 +41,72 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
+
 /**
  * Test TTL worker with persistence enabled
  */
 @WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
 public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     /** */
-    public static final String CACHE_NAME = "expirableCache";
+    private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
+
+    /** */
+    private static final String CACHE_NAME_ATOMIC_NON_PERSISTENT = "expirable-non-persistent-cache-atomic";
+
+    /** */
+    private static final String CACHE_NAME_TX = "expirable-cache-tx";
+
+    /** */
+    private static final String CACHE_NAME_LOCAL_ATOMIC = "expirable-cache-local-atomic";
+
+    /** */
+    private static final String CACHE_NAME_LOCAL_TX = "expirable-cache-local-tx";
+
+    /** */
+    private static final String CACHE_NAME_NEAR_ATOMIC = "expirable-cache-near-atomic";
+
+    /** */
+    private static final String CACHE_NAME_NEAR_TX = "expirable-cache-near-tx";
 
     /** */
-    public static final String GROUP_NAME = "group1";
+    private static final String NON_PERSISTENT_DATA_REGION = "non-persistent-region";
 
     /** */
-    public static final int PART_SIZE = 32;
+    public static final int PART_SIZE = 2;
 
     /** */
     private static final int EXPIRATION_TIMEOUT = 10;
@@ -71,15 +114,14 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     /** */
     public static final int ENTRIES = 50_000;
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-    }
+    /** */
+    public static final int SMALL_ENTRIES = 10;
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-    }
+    /** */
+    private static final int WORKLOAD_THREADS_CNT = 16;
+
+    /** Fail. */
+    private volatile boolean failureHndTriggered;
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
@@ -104,31 +146,58 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        DataRegionConfiguration dfltRegion = new DataRegionConfiguration()
+            .setMaxSize(2L * 1024 * 1024 * 1024)
+            .setPersistenceEnabled(true);
+
+        DataRegionConfiguration nonPersistentRegion = new DataRegionConfiguration()
+            .setName(NON_PERSISTENT_DATA_REGION)
+            .setMaxSize(2L * 1024 * 1024 * 1024)
+            .setPersistenceEnabled(false);
+
         cfg.setDataStorageConfiguration(
             new DataStorageConfiguration()
-                .setDefaultDataRegionConfiguration(
-                    new DataRegionConfiguration()
-                        .setMaxSize(2L * 1024 * 1024 * 1024)
-                        .setPersistenceEnabled(true)
-                ).setWalMode(WALMode.LOG_ONLY));
-
-        cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
+                .setDefaultDataRegionConfiguration(dfltRegion)
+                .setDataRegionConfigurations(nonPersistentRegion)
+                .setWalMode(WALMode.LOG_ONLY));
+
+        cfg.setCacheConfiguration(
+            getCacheConfiguration(CACHE_NAME_ATOMIC).setAtomicityMode(ATOMIC),
+            getCacheConfiguration(CACHE_NAME_TX).setAtomicityMode(TRANSACTIONAL),
+            getCacheConfiguration(CACHE_NAME_LOCAL_ATOMIC).setAtomicityMode(ATOMIC).setCacheMode(CacheMode.LOCAL),
+            getCacheConfiguration(CACHE_NAME_LOCAL_TX).setAtomicityMode(TRANSACTIONAL).setCacheMode(CacheMode.LOCAL),
+            getCacheConfiguration(CACHE_NAME_NEAR_ATOMIC).setAtomicityMode(ATOMIC)
+                .setNearConfiguration(new NearCacheConfiguration<>()),
+            getCacheConfiguration(CACHE_NAME_NEAR_TX).setAtomicityMode(TRANSACTIONAL)
+                .setNearConfiguration(new NearCacheConfiguration<>())
+        );
 
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new NoOpFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+                failureHndTriggered = true;
+
+                return super.handle(ignite, failureCtx);
+            }
+        };
+    }
+
     /**
      * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
+     *
      * @param name Cache name.
      * @return Cache configuration.
      */
-    private CacheConfiguration getCacheConfiguration(String name) {
-        CacheConfiguration ccfg = new CacheConfiguration();
+    private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
 
         ccfg.setName(name);
-        ccfg.setGroupName(GROUP_NAME);
         ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
-        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
         ccfg.setEagerTtl(true);
         ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
         ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
@@ -150,23 +219,23 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     @Test
     public void testTtlIsAppliedForMultipleCaches() throws Exception {
         IgniteEx srv = startGrid(0);
-        srv.cluster().active(true);
+        srv.cluster().state(ACTIVE);
 
         int cacheCnt = 2;
 
         // Create a new caches in the same group.
         // It is important that initially created cache CACHE_NAME remains empty.
         for (int i = 0; i < cacheCnt; ++i) {
-            String cacheName = CACHE_NAME + "-" + i;
+            String cacheName = CACHE_NAME_ATOMIC + "-" + i;
 
             srv.getOrCreateCache(getCacheConfiguration(cacheName));
 
             fillCache(srv.cache(cacheName));
         }
 
-        waitAndCheckExpired(srv, srv.cache(CACHE_NAME + "-" + (cacheCnt - 1)));
+        waitAndCheckExpired(srv, srv.cache(CACHE_NAME_ATOMIC + "-" + (cacheCnt - 1)));
 
-        srv.cluster().active(false);
+        srv.cluster().state(ACTIVE);
 
         stopAllGrids();
     }
@@ -182,27 +251,141 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     /**
      * @throws Exception if failed.
      */
+    @Test
+    public void testPutOpsIntoCacheWithExpirationConcurrentlyWithCheckpointCompleteSuccessfully() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+
+        ig0.cluster().state(ACTIVE);
+
+        IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME_ATOMIC);
+
+        AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+        IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
+            while (!timeoutReached.get()) {
+                Map<Object, Object> map = new TreeMap<>();
+
+                for (int i = 0; i < ENTRIES; i++)
+                    map.put(i, i);
+
+                cache.putAll(map);
+            }
+        }, WORKLOAD_THREADS_CNT, "loader");
+
+        IgniteInternalFuture<?> updaterFut = runMultiThreadedAsync(() -> {
+            while (!timeoutReached.get()) {
+                for (int i = 0; i < SMALL_ENTRIES; i++)
+                    cache.put(i, i * 10);
+            }
+        }, WORKLOAD_THREADS_CNT, "updater");
+
+        IgniteInternalFuture<?> cpWriteLockUnlockFut = runAsync(() -> {
+            ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
+
+            while (!timeoutReached.get()) {
+                try {
+                    lock.writeLock().lockInterruptibly();
+
+                    doSleep(30);
+                }
+                catch (InterruptedException ignored) {
+                    break;
+                }
+                finally {
+                    lock.writeLock().unlock();
+                }
+
+                doSleep(30);
+            }
+        }, "cp-write-lock-holder");
+
+        doSleep(10_000);
+
+        timeoutReached.set(true);
+
+        waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testConcurrentPutOpsToCacheWithExpirationCompleteSuccesfully() throws Exception {
+        final AtomicBoolean end = new AtomicBoolean();
+
+        final IgniteEx srv = startGrids(3);
+
+        srv.cluster().state(ACTIVE);
+
+        // Start high workload.
+        IgniteInternalFuture<?> loadFut = runMultiThreadedAsync(() -> {
+            List<IgniteCache<Object, Object>> caches = F.asList(
+                srv.cache(CACHE_NAME_ATOMIC),
+                srv.cache(CACHE_NAME_TX),
+                srv.cache(CACHE_NAME_LOCAL_ATOMIC),
+                srv.cache(CACHE_NAME_LOCAL_TX),
+                srv.cache(CACHE_NAME_NEAR_ATOMIC),
+                srv.cache(CACHE_NAME_NEAR_TX)
+            );
+
+            while (!end.get() && !failureHndTriggered) {
+                for (IgniteCache<Object, Object> cache : caches) {
+                    for (int i = 0; i < SMALL_ENTRIES; i++)
+                        cache.put(i, new byte[1024]);
+
+                    cache.putAll(new TreeMap<>(F.asMap(0, new byte[1024], 1, new byte[1024])));
+
+                    for (int i = 0; i < SMALL_ENTRIES; i++)
+                        cache.get(i);
+
+                    cache.getAll(new TreeSet<>(F.asList(0, 1)));
+                }
+            }
+        }, WORKLOAD_THREADS_CNT, "high-workload");
+
+        try {
+            // Let's wait some time.
+            loadFut.get(10, TimeUnit.SECONDS);
+        }
+        catch (Exception e) {
+            assertFalse("Failure handler was called. See log above.", failureHndTriggered);
+
+            assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
+        }
+        finally {
+            end.set(true);
+        }
+
+        assertFalse("Failure handler was called. See log above.", failureHndTriggered);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     private void loadAndWaitForCleanup(boolean restartGrid) throws Exception {
         IgniteEx srv = startGrid(0);
-        srv.cluster().active(true);
 
-        fillCache(srv.cache(CACHE_NAME));
+        srv.cluster().state(ACTIVE);
+
+        fillCache(srv.cache(CACHE_NAME_ATOMIC));
 
         if (restartGrid) {
             srv.context().cache().context().database().waitForCheckpoint("test-checkpoint");
 
             stopGrid(0);
             srv = startGrid(0);
-            srv.cluster().active(true);
+            srv.cluster().state(ACTIVE);
         }
 
-        final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+        final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
 
         printStatistics((IgniteCacheProxy)cache, "After restart from LFS");
 
         waitAndCheckExpired(srv, cache);
 
-        srv.cluster().active(false);
+        srv.cluster().state(ACTIVE);
 
         stopAllGrids();
     }
@@ -215,22 +398,22 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
         IgniteEx srv = startGrid(0);
 
         srv.cluster().baselineAutoAdjustEnabled(false);
-        srv.cluster().active(true);
+        srv.cluster().state(ACTIVE);
 
-        fillCache(srv.cache(CACHE_NAME));
+        fillCache(srv.cache(CACHE_NAME_ATOMIC));
 
         srv = startGrid(1);
 
         //causes rebalancing start
         srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
 
-        final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+        final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
 
         printStatistics((IgniteCacheProxy)cache, "After rebalancing start");
 
         waitAndCheckExpired(srv, cache);
 
-        srv.cluster().active(false);
+        srv.cluster().state(INACTIVE);
 
         stopAllGrids();
     }
@@ -250,13 +433,13 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
 
             ExpiryPolicy plc = CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY).create();
 
-            IgniteCache<Integer, byte[]> cache0 = srv.cache(CACHE_NAME);
+            IgniteCache<Integer, byte[]> cache0 = srv.cache(CACHE_NAME_ATOMIC);
 
             fillCache(cache0.withExpiryPolicy(plc));
 
             srv = startGrid(2);
 
-            IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+            IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME_ATOMIC);
 
             //causes rebalancing start
             srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
@@ -274,13 +457,43 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
             stopGrid(1);
             startGrid(1);
 
-            srv.cluster().active(false);
+            srv.cluster().state(INACTIVE);
         }
         finally {
             stopAllGrids();
         }
     }
 
+    /**
+     * Tests that cache entries (cache related to non persistent region) correctly expired.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExpirationNonPersistentRegion() throws Exception {
+        IgniteEx srv = startGrid(0);
+
+        srv.cluster().baselineAutoAdjustEnabled(false);
+        srv.cluster().state(ACTIVE);
+
+        CacheConfiguration<?, ?> cfg =
+            getCacheConfiguration(CACHE_NAME_ATOMIC_NON_PERSISTENT)
+                .setAtomicityMode(ATOMIC)
+                .setDataRegionName(NON_PERSISTENT_DATA_REGION);
+
+        srv.getOrCreateCache(cfg);
+
+        IgniteCache<Integer, byte[]> nonPersistentCache = srv.cache(CACHE_NAME_ATOMIC_NON_PERSISTENT);
+
+        fillCache(nonPersistentCache);
+
+        waitAndCheckExpired(srv, nonPersistentCache);
+
+        stopAllGrids();
+
+        assertFalse("Failure handler should not be triggered.", failureHndTriggered);
+    }
+
     /** */
     protected void fillCache(IgniteCache<Integer, byte[]> cache) {
         cache.putAll(new TreeMap<Integer, byte[]>() {{
@@ -311,7 +524,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
         printStatistics((IgniteCacheProxy)cache, "After timeout");
 
         GridCacheSharedContext ctx = srv.context().cache().context();
-        GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME));
+        GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME_ATOMIC));
 
         // Check partitions through internal API.
         for (int partId = 0; partId < PART_SIZE; ++partId) {
@@ -321,7 +534,7 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
                 continue;
 
             IgniteCacheOffheapManager.CacheDataStore dataStore =
-                ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart);
+                ctx.cache().cacheGroup(CU.cacheId(CACHE_NAME_ATOMIC)).offheap().dataStore(locPart);
 
             GridCursor cur = dataStore.cursor();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 2f065c4..d930b3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlDeactivateOnHighloadTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
@@ -257,8 +256,6 @@ public class IgnitePdsTestSuite2 {
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsPartitionsStateRecoveryTest.class, ignoredTests);
 
-        GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
-
         GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);