You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/20 13:22:16 UTC

[ignite] branch master updated: IGNITE-11438 Fixed issue with TTL manager not cleaning entries from the underlying store. Fixes #6197

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 145feac  IGNITE-11438 Fixed issue with TTL manager not cleaning entries from the underlying store. Fixes #6197
145feac is described below

commit 145feac734d6edbb776a1d8dad82cc5361560da3
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Wed Mar 20 16:19:46 2019 +0300

    IGNITE-11438 Fixed issue with TTL manager not cleaning entries from the underlying store. Fixes #6197
---
 .../processors/cache/GridCacheMapEntry.java        |  10 +-
 .../processors/cache/GridCacheTtlManager.java      |  47 ++++++--
 .../cache/IgniteCacheOffheapManagerImpl.java       |  25 +----
 .../cache/persistence/GridCacheOffheapManager.java |  45 ++++----
 .../cache/persistence/db/IgnitePdsWithTtlTest.java | 118 +++++++++++++++++----
 5 files changed, 166 insertions(+), 79 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index a3eda18..2428290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -552,9 +552,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     update(val, read.expireTime(), 0, read.version(), false);
 
-                    long delta = checkExpire && read.expireTime() > 0 ? read.expireTime() - U.currentTimeMillis() : 0;
-
-                    if (delta >= 0)
+                    if (!(checkExpire && read.expireTime() > 0) || (read.expireTime() > U.currentTimeMillis()))
                         return read;
                     else {
                         if (onExpired(this.val, null)) {
@@ -3980,7 +3978,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             long expireTime = expireTimeExtras();
 
-            if (!(expireTime > 0 && expireTime < U.currentTimeMillis()))
+            if (!(expireTime > 0 && expireTime <= U.currentTimeMillis()))
                 return false;
 
             CacheObject expiredVal = this.val;
@@ -5760,7 +5758,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private CacheDataRow checkRowExpired(CacheDataRow row) throws IgniteCheckedException {
             assert row != null;
 
-            if (!(row.expireTime() > 0 && row.expireTime() < U.currentTimeMillis()))
+            if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis()))
                 return row;
 
             GridCacheContext cctx = entry.context();
@@ -6131,7 +6129,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private boolean checkRowExpired(CacheDataRow row) throws IgniteCheckedException {
             assert row != null;
 
-            if (!(row.expireTime() > 0 && row.expireTime() < U.currentTimeMillis()))
+            if (!(row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis()))
                 return false;
 
             GridCacheContext cctx = entry.context();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 1a887e2..f82cc75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -38,9 +39,22 @@ import org.jetbrains.annotations.Nullable;
  * {@link CacheConfiguration#isEagerTtl()} flag is set.
  */
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
-    /** Entries pending removal. */
+    /**
+     * Throttling timeout in millis which avoid excessive PendingTree access on unwind
+     * if there is nothing to clean yet.
+     */
+    public static final long UNWIND_THROTTLING_TIMEOUT = Long.getLong(
+        IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, 500L);
+
+    /** Entries pending removal. This collection tracks entries for near cache only. */
     private GridConcurrentSkipListSetEx pendingEntries;
 
+    /** Indicates that  */
+    protected volatile boolean hasPendingEntries;
+
+    /** Timestamp when next clean try will be allowed. Used for throttling on per-cache basis. */
+    protected volatile long nextCleanTime;
+
     /** See {@link CacheConfiguration#isEagerTtl()}. */
     private volatile boolean eagerTtlEnabled;
 
@@ -140,6 +154,22 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         return (pendingEntries != null ? pendingEntries.sizex() : 0) + cctx.offheap().expiredSize();
     }
 
+    /**
+     * Updates the flag {@code hasPendingEntries} with the given value.
+     *
+     * @param update {@code true} if the underlying pending tree has entries with expire policy enabled.
+     */
+    public void hasPendingEntries(boolean update) {
+        hasPendingEntries = update;
+    }
+
+    /**
+     * @return {@code true} if the underlying pending tree has entries with expire policy enabled.
+     */
+    public boolean hasPendingEntries() {
+        return hasPendingEntries;
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         try {
@@ -154,13 +184,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * Expires entries by TTL.
-     */
-    public void expire() {
-        expire(-1);
-    }
-
-    /**
      * Processes specified amount of expired entries.
      *
      * @param amount Limit of processed entries by single call, {@code -1} for no limit.
@@ -201,14 +224,20 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
                 }
             }
 
-            if(!(cctx.affinityNode() && cctx.ttl().eagerTtlEnabled()))
+            if(!cctx.affinityNode())
                 return false;  /* Pending tree never contains entries for that cache */
 
+            if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis())
+                return false;
+
             boolean more = cctx.offheap().expire(dhtCtx, expireC, amount);
 
             if (more)
                 return true;
 
+            // There is nothing to clean, so the next clean up can be postponed.
+            nextCleanTime = U.currentTimeMillis() + UNWIND_THROTTLING_TIMEOUT;
+
             if (amount != -1 && pendingEntries != null) {
                 EntryWrapper e = pendingEntries.firstx();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 22783a4..04443be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -139,13 +139,6 @@ import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
  *
  */
 public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
-    /**
-     * Throttling timeout in millis which avoid excessive PendingTree access on unwind
-     * if there is nothing to clean yet.
-     */
-    public static final long UNWIND_THROTTLING_TIMEOUT = Long.getLong(
-        IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, 500L);
-
     /** */
     protected GridCacheSharedContext ctx;
 
@@ -165,12 +158,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     private PendingEntriesTree pendingEntries;
 
     /** */
-    protected volatile boolean hasPendingEntries;
-
-    /** Timestamp when next clean try will be allowed. Used for throttling on per-group basis. */
-    protected volatile long nextCleanTime;
-
-    /** */
     private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000);
 
     /** */
@@ -1319,17 +1306,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     ) throws IgniteCheckedException {
         assert !cctx.isNear() : cctx.name();
 
-        if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis())
-            return false;
-
         assert pendingEntries != null;
 
         int cleared = expireInternal(cctx, c, amount);
 
-        // Throttle if there is nothing to clean anymore.
-        if (cleared < amount)
-            nextCleanTime = U.currentTimeMillis() + UNWIND_THROTTLING_TIMEOUT;
-
         return amount != -1 && cleared >= amount;
     }
 
@@ -2256,7 +2236,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-                boolean cleanup = cctx.queries().enabled() || hasPendingEntries;
+                boolean cleanup = cctx.queries().enabled() || cctx.ttl().hasPendingEntries();
 
                 assert cctx.shared().database().checkpointLockIsHeldByThread();
 
@@ -2572,7 +2552,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (pendingTree() != null && expireTime != 0) {
                 pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link()));
 
-                hasPendingEntries = true;
+                if (!cctx.ttl().hasPendingEntries())
+                    cctx.ttl().hasPendingEntries(true);
             }
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 26a535c..ccba934 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -964,9 +964,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     ) throws IgniteCheckedException {
         assert !cctx.isNear() : cctx.name();
 
-        if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis())
-            return false;
-
         // Prevent manager being stopped in the middle of pds operation.
         if (!busyLock.enterBusy())
             return false;
@@ -980,10 +977,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                 if (amount != -1 && cleared >= amount)
                     return true;
             }
-
-            // Throttle if there is nothing to clean anymore.
-            if (cleared < amount)
-                nextCleanTime = U.currentTimeMillis() + UNWIND_THROTTLING_TIMEOUT;
         }
         finally {
             busyLock.leaveBusy();
@@ -1427,8 +1420,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         /** */
         private volatile CacheDataStore delegate;
 
-        /** Timestamp when next clean try will be allowed for current partition.
-         * Used for fine-grained throttling on per-partition basis. */
+        /**
+         * Cache id which should be throttled.
+         */
+        private volatile int lastThrottledCacheId;
+
+        /**
+         * Timestamp when next clean try will be allowed for the current partition
+         * in accordance with the value of {@code lastThrottledCacheId}.
+         * Used for fine-grained throttling on per-partition basis.
+         */
         private volatile long nextStoreCleanTime;
 
         /** */
@@ -1574,8 +1575,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
                     pendingTree = pendingTree0;
 
-                    if (!hasPendingEntries && !pendingTree0.isEmpty())
-                        hasPendingEntries = true;
+                    if (!pendingTree0.isEmpty())
+                        grp.caches().forEach(cctx -> cctx.ttl().hasPendingEntries(true));
 
                     int grpId = grp.groupId();
                     long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
@@ -2314,14 +2315,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
          * @return cleared entries count.
          * @throws IgniteCheckedException If failed.
          */
-        public int purgeExpired(GridCacheContext cctx,
+        public int purgeExpired(
+            GridCacheContext cctx,
             IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
-            int amount) throws IgniteCheckedException {
+            int amount
+        ) throws IgniteCheckedException {
             CacheDataStore delegate0 = init0(true);
 
             long now = U.currentTimeMillis();
 
-            if (delegate0 == null || nextStoreCleanTime > now)
+            if (delegate0 == null || (cctx.cacheId() == lastThrottledCacheId && nextStoreCleanTime > now))
                 return 0;
 
             assert pendingTree != null : "Partition data store was not initialized.";
@@ -2329,8 +2332,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             int cleared = purgeExpiredInternal(cctx, c, amount);
 
             // Throttle if there is nothing to clean anymore.
-            if (cleared < amount)
-                nextStoreCleanTime = now + UNWIND_THROTTLING_TIMEOUT;
+            if (cleared < amount) {
+                lastThrottledCacheId = cctx.cacheId();
+
+                nextStoreCleanTime = now + GridCacheTtlManager.UNWIND_THROTTLING_TIMEOUT;
+            }
 
             return cleared;
         }
@@ -2344,10 +2350,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
          * @return cleared entries count.
          * @throws IgniteCheckedException If failed.
          */
-        private int purgeExpiredInternal(GridCacheContext cctx,
+        private int purgeExpiredInternal(
+            GridCacheContext cctx,
             IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
-            int amount) throws IgniteCheckedException {
-
+            int amount
+        ) throws IgniteCheckedException {
             GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false);
 
             // Skip non-owned partitions.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
index e2db501..fc4fa6d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -24,6 +24,7 @@ import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -34,10 +35,15 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -53,10 +59,16 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     public static final String CACHE_NAME = "expirableCache";
 
     /** */
+    public static final String GROUP_NAME = "group1";
+
+    /** */
+    public static final int PART_SIZE = 32;
+
+    /** */
     private static final int EXPIRATION_TIMEOUT = 10;
 
     /** */
-    public static final int ENTRIES = 100_000;
+    public static final int ENTRIES = 50_000;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -95,29 +107,39 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        final CacheConfiguration ccfg = new CacheConfiguration();
-        ccfg.setName(CACHE_NAME);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
-        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
-        ccfg.setEagerTtl(true);
-        ccfg.setGroupName("group1");
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-
         cfg.setDataStorageConfiguration(
             new DataStorageConfiguration()
                 .setDefaultDataRegionConfiguration(
                     new DataRegionConfiguration()
-                        .setMaxSize(192L * 1024 * 1024)
+                        .setMaxSize(2L * 1024 * 1024 * 1024)
                         .setPersistenceEnabled(true)
                 ).setWalMode(WALMode.LOG_ONLY));
 
-        cfg.setCacheConfiguration(ccfg);
+        cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
 
         return cfg;
     }
 
     /**
+     * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration getCacheConfiguration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setGroupName(GROUP_NAME);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setEagerTtl(true);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+        return ccfg;
+    }
+
+    /**
      * @throws Exception if failed.
      */
     @Test
@@ -129,6 +151,31 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     @Test
+    public void testTtlIsAppliedForMultipleCaches() throws Exception {
+        IgniteEx srv = startGrid(0);
+        srv.cluster().active(true);
+
+        int cacheCnt = 2;
+
+        // Create a new caches in the same group.
+        // It is important that initially created cache CACHE_NAME remains empty.
+        for (int i = 0; i < cacheCnt; ++i) {
+            String cacheName = CACHE_NAME + "-" + i;
+
+            srv.getOrCreateCache(getCacheConfiguration(cacheName));
+
+            fillCache(srv.cache(cacheName));
+        }
+
+        waitAndCheckExpired(srv, srv.cache(CACHE_NAME + "-" + (cacheCnt - 1)));
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
     public void testTtlIsAppliedAfterRestart() throws Exception {
         loadAndWaitForCleanup(true);
     }
@@ -143,6 +190,8 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
         fillCache(srv.cache(CACHE_NAME));
 
         if (restartGrid) {
+            srv.context().cache().context().database().wakeupForCheckpoint("test-checkpoint");
+
             stopGrid(0);
             srv = startGrid(0);
             srv.cluster().active(true);
@@ -150,9 +199,9 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
 
         final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
 
-        pringStatistics((IgniteCacheProxy)cache, "After restart from LFS");
+        printStatistics((IgniteCacheProxy)cache, "After restart from LFS");
 
-        waitAndCheckExpired(cache);
+        waitAndCheckExpired(srv, cache);
 
         stopAllGrids();
     }
@@ -174,9 +223,9 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
 
         final IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
 
-        pringStatistics((IgniteCacheProxy)cache, "After rebalancing start");
+        printStatistics((IgniteCacheProxy)cache, "After rebalancing start");
 
-        waitAndCheckExpired(cache);
+        waitAndCheckExpired(srv, cache);
 
         stopAllGrids();
     }
@@ -233,26 +282,49 @@ public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
         for (int i = 0; i < ENTRIES; i++)
             cache.get(i); // touch entries
 
-        pringStatistics((IgniteCacheProxy)cache, "After cache puts");
+        printStatistics((IgniteCacheProxy)cache, "After cache puts");
     }
 
     /** */
     protected void waitAndCheckExpired(
-        final IgniteCache<Integer, byte[]> cache) throws IgniteInterruptedCheckedException {
-        GridTestUtils.waitForCondition(new PA() {
+        IgniteEx srv,
+        final IgniteCache<Integer, byte[]> cache
+    ) throws IgniteCheckedException {
+        boolean awaited = GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 return cache.size() == 0;
             }
-        }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1));
+        }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + EXPIRATION_TIMEOUT / 2));
+
+        assertTrue("Cache is not empty. size=" + cache.size(), awaited);
+
+        printStatistics((IgniteCacheProxy)cache, "After timeout");
+
+        GridCacheSharedContext ctx = srv.context().cache().context();
+        GridCacheContext cctx = ctx.cacheContext(CU.cacheId(CACHE_NAME));
 
-        pringStatistics((IgniteCacheProxy)cache, "After timeout");
+        // Check partitions through internal API.
+        for (int partId = 0; partId < PART_SIZE; ++partId) {
+            GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(partId);
+
+            if (locPart == null)
+                continue;
+
+            IgniteCacheOffheapManager.CacheDataStore dataStore =
+                ctx.cache().cacheGroup(CU.cacheId(GROUP_NAME)).offheap().dataStore(locPart);
+
+            GridCursor cur = dataStore.cursor();
+
+            assertFalse(cur.next());
+            assertEquals(0, locPart.fullSize());
+        }
 
         for (int i = 0; i < ENTRIES; i++)
             assertNull(cache.get(i));
     }
 
     /** */
-    private void pringStatistics(IgniteCacheProxy cache, String msg) {
+    private void printStatistics(IgniteCacheProxy cache, String msg) {
         System.out.println(msg + " {{");
         cache.context().printMemoryStats();
         System.out.println("}} " + msg);