You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2016/02/15 16:03:51 UTC

ignite git commit: IGNITE-2610 - WIP

Repository: ignite
Updated Branches:
  refs/heads/ignite-2610 [created] 1adc02a58


IGNITE-2610 - WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1adc02a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1adc02a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1adc02a5

Branch: refs/heads/ignite-2610
Commit: 1adc02a584c51008fa17f75271fd3e94912043e9
Parents: 46b6a76
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Feb 15 18:03:36 2016 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Feb 15 18:03:36 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  47 +--
 .../processors/cache/GridCacheTtlManager.java   |  73 +++-
 .../distributed/dht/GridDhtCacheEntry.java      |   2 +-
 .../atomic/GridDhtAtomicOffHeapCacheEntry.java  |   9 +
 .../distributed/near/GridNearCacheEntry.java    |   4 +-
 .../cache/query/GridCacheQueryManager.java      | 367 ++++++++++++-------
 .../IgniteCacheAtomicExpiryPolicyTest.java      |   5 +
 ...gniteCacheAtomicOffheapExpiryPolicyTest.java |  35 ++
 .../IgniteCacheExpiryPolicyAbstractTest.java    | 245 +++++++++----
 9 files changed, 549 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c1eeb5e..bf75fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -529,7 +529,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
                         // Set unswapped value.
-                        update(val, e.expireTime(), e.ttl(), e.version());
+                        update(val, e.expireTime(), e.ttl(), e.version(), false);
 
                         // Must update valPtr again since update() will reset it.
                         if (cctx.offheapTiered() && e.offheapPointer() > 0)
@@ -945,7 +945,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     boolean hadValPtr = hasOffHeapPointer();
 
                     // Don't change version for read-through.
-                    update(ret, expTime, ttl, nextVer);
+                    update(ret, expTime, ttl, nextVer, true);
 
                     if (hadValPtr && cctx.offheapTiered())
                         cctx.swap().removeOffheap(key);
@@ -1040,7 +1040,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             deletedUnlocked(true);
                     }
 
-                    update(ret, expTime, ttl, nextVer);
+                    update(ret, expTime, ttl, nextVer, true);
 
                     touch = true;
 
@@ -1194,7 +1194,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (updateCntr != null && updateCntr != 0)
                 updateCntr0 = updateCntr;
 
-            update(val, expireTime, ttl, newVer);
+            update(val, expireTime, ttl, newVer, true);
 
             drReplicate(drType, val, newVer);
 
@@ -1356,7 +1356,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             boolean hadValPtr = hasOffHeapPointer();
 
-            update(null, 0, 0, newVer);
+            update(null, 0, 0, newVer, true);
 
             if (cctx.offheapTiered() && hadValPtr) {
                 boolean rmv = cctx.swap().removeOffheap(key);
@@ -1572,7 +1572,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 else
                     clearIndex(null);
 
-                update(old, expireTime, ttl, ver);
+                update(old, expireTime, ttl, ver, true);
             }
 
             // Apply metrics.
@@ -1719,7 +1719,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 assert ttl != CU.TTL_ZERO;
 
-                update(updated, expireTime, ttl, ver);
+                update(updated, expireTime, ttl, ver, true);
 
                 if (evt) {
                     CacheObject evtOld = null;
@@ -1756,7 +1756,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
 
-                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
+                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
 
                 if (cctx.offheapTiered() && hasValPtr) {
                     boolean rmv = cctx.swap().removeOffheap(key);
@@ -2131,7 +2131,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 else
                     clearIndex(null);
 
-                update(oldVal, initExpireTime, initTtl, ver);
+                update(oldVal, initExpireTime, initTtl, ver, true);
 
                 if (deletedUnlocked() && oldVal != null && !isInternal())
                     deletedUnlocked(false);
@@ -2345,7 +2345,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 // in load methods without actually holding entry lock.
                 updateIndex(updated, newExpireTime, newVer, oldVal);
 
-                update(updated, newExpireTime, newTtl, newVer);
+                update(updated, newExpireTime, newTtl, newVer, true);
 
                 updateCntr0 = nextPartCounter(topVer);
 
@@ -2430,7 +2430,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 boolean hasValPtr = hasOffHeapPointer();
 
                 // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
 
                 assert newSysTtl == CU.TTL_NOT_CHANGED;
                 assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
@@ -2707,7 +2707,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
                         if (!deletedUnlocked()) {
-                            update(null, 0L, 0L, ver);
+                            update(null, 0L, 0L, ver, true);
 
                             deletedUnlocked(true);
 
@@ -2895,24 +2895,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param ttl Time to live.
      * @param ver Update version.
      */
-    protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) {
+    protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver, boolean addTracked) {
         assert ver != null;
         assert Thread.holdsLock(this);
         assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
 
         long oldExpireTime = expireTimeExtras();
 
-        if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+        if (addTracked && oldExpireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
             cctx.ttl().removeTrackedEntry(this);
 
         value(val);
 
         ttlAndExpireTimeExtras(ttl, expireTime);
 
-        if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
-            cctx.ttl().addTrackedEntry(this);
-
         this.ver = ver;
+
+        if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
+            cctx.ttl().addTrackedEntry(this);
     }
 
     /**
@@ -2950,7 +2950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * @param ttl Time to live.
      */
-    private void updateTtl(long ttl) {
+    protected void updateTtl(long ttl) {
         assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
         assert Thread.holdsLock(this);
 
@@ -3224,7 +3224,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     @Override public synchronized CacheObject rawPut(CacheObject val, long ttl) {
         CacheObject old = this.val;
 
-        update(val, CU.toExpireTime(ttl), ttl, nextVersion());
+        update(val, CU.toExpireTime(ttl), ttl, nextVersion(), true);
 
         return old;
     }
@@ -3252,7 +3252,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     updateIndex(val, expTime, ver, null);
 
                 // Version does not change for load ops.
-                update(val, expTime, ttl, ver);
+                update(val, expTime, ttl, ver, true);
 
                 boolean skipQryNtf = false;
 
@@ -3338,7 +3338,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             update(val,
                 unswapped.expireTime(),
                 unswapped.ttl(),
-                unswapped.version()
+                unswapped.version(),
+                true
             );
 
             return true;
@@ -3397,7 +3398,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
 
                 // Version does not change for load ops.
-                update(val, expTime, ttl, newVer);
+                update(val, expTime, ttl, newVer, true);
 
                 return newVer;
             }
@@ -3612,7 +3613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     if (!obsolete()) {
                         if (cctx.deferredDelete() && !detached() && !isInternal()) {
                             if (!deletedUnlocked()) {
-                                update(null, 0L, 0L, ver0 = ver);
+                                update(null, 0L, 0L, ver0 = ver, true);
 
                                 deletedUnlocked(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index bdb1f18..d119317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
@@ -74,6 +76,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
      * @param entry Entry to add.
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
+        assert Thread.holdsLock(entry);
+        assert cleanupWorker != null;
+
         pendingEntries.add(new EntryWrapper(entry));
     }
 
@@ -82,10 +87,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
      */
     public void removeTrackedEntry(GridCacheMapEntry entry) {
         assert Thread.holdsLock(entry);
+        assert cleanupWorker != null;
 
         pendingEntries.remove(new EntryWrapper(entry));
     }
 
+    /**
+     * @return The size of pending entries.
+     */
+    public int pendingSize() {
+        return pendingEntries.sizex();
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");
@@ -150,6 +163,46 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param cctx Cache context.
+     * @param key1 Left key to compare.
+     * @param key2 Right key to compare.
+     * @return Comparison result.
+     */
+    private static int compareKeys(GridCacheContext cctx, CacheObject key1, CacheObject key2) {
+        int key1Hash = key1.hashCode();
+        int key2Hash = key2.hashCode();
+
+        int res = Integer.compare(key1Hash, key2Hash);
+
+        if (res == 0) {
+            key1 = (CacheObject)cctx.unwrapTemporary(key1);
+            key2 = (CacheObject)cctx.unwrapTemporary(key2);
+
+            try {
+                byte[] key1ValBytes = key1.valueBytes(cctx.cacheObjectContext());
+                byte[] key2ValBytes = key2.valueBytes(cctx.cacheObjectContext());
+
+                // Must not do fair array comparison.
+                res = Integer.compare(key1ValBytes.length, key2ValBytes.length);
+
+                if (res == 0) {
+                    for (int i = 0; i < key1ValBytes.length; i++) {
+                        res = Byte.compare(key1ValBytes[i], key2ValBytes[i]);
+
+                        if (res != 0)
+                            break;
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        return res;
+    }
+
+    /**
      * Entry wrapper.
      */
     private static class EntryWrapper implements Comparable<EntryWrapper> {
@@ -174,8 +227,12 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         @Override public int compareTo(EntryWrapper o) {
             int res = Long.compare(expireTime, o.expireTime);
 
-            if (res == 0)
-                res = Long.compare(entry.startVersion(), o.entry.startVersion());
+            if (res == 0) {
+                // Must compare entries of the same cache.
+                assert entry.context() == o.entry.context();
+
+                res = compareKeys(entry.context(), entry.key(), o.entry.key());
+            }
 
             return res;
         }
@@ -190,7 +247,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
             EntryWrapper that = (EntryWrapper)o;
 
-            return expireTime == that.expireTime && entry.startVersion() == that.entry.startVersion();
+            return expireTime == that.expireTime && compareKeys(entry.context(), entry.key(), that.entry.key()) == 0;
 
         }
 
@@ -198,10 +255,15 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         @Override public int hashCode() {
             int res = (int)(expireTime ^ (expireTime >>> 32));
 
-            res = 31 * res + (int)(entry.startVersion() ^ (entry.startVersion() >>> 32));
+            res = 31 * res + entry.key().hashCode();
 
             return res;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntryWrapper.class, this);
+        }
     }
 
     /**
@@ -230,10 +292,9 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         @Override public boolean add(EntryWrapper e) {
             boolean res = super.add(e);
 
-            assert res;
+            assert res : "Failed to add entry wrapper:"  + e;
 
             size.increment();
-
             return res;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 14e3d3e..fae8219 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -580,7 +580,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 clearIndex(prev);
 
                 // Give to GC.
-                update(null, 0L, 0L, ver);
+                update(null, 0L, 0L, ver, true);
 
                 if (swap) {
                     releaseSwap();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
index 85cfb80..9321449 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
@@ -21,6 +21,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * DHT atomic cache entry for off-heap tiered or off-heap values modes.
@@ -52,6 +54,13 @@ public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry {
     }
 
     /** {@inheritDoc} */
+    @Override protected void updateTtl(long ttl) {
+        super.updateTtl(ttl);
+
+        valPtr = 0;
+    }
+
+    /** {@inheritDoc} */
     @Override protected long offHeapPointer() {
         return valPtr;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 026fb4d..943a91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -166,7 +166,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
 
                             if (isNew() || !valid(topVer)) {
                                 // Version does not change for load ops.
-                                update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version());
+                                update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
 
                                 if (cctx.deferredDelete() && !isNew() && !isInternal()) {
                                     boolean deleted = val == null;
@@ -402,7 +402,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
                     primaryNode(primaryNodeId, topVer);
 
-                    update(val, expireTime, ttl, ver);
+                    update(val, expireTime, ttl, ver, true);
 
                     if (cctx.deferredDelete() && !isInternal()) {
                         boolean deleted = val == null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0d8f795..751279d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -809,161 +809,67 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         prj0 = prj0.keepBinary();
 
-        final IgniteInternalCache<K, V> prj = prj0;
+        final IgniteInternalCache prj = prj0;
 
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
         try {
             injectResources(keyValFilter);
 
-            final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
-
-            final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
-
             final ExpiryPolicy plc = cctx.expiry();
 
             final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
             final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
-            final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
-                new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-                    private IgniteBiTuple<K, V> next;
-
-                    private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
-
-                    private Iterator<K> iter;
+            Iterator<KeyCacheObject> keyIter;
 
-                    private GridDhtLocalPartition locPart;
+            GridDhtLocalPartition locPart = null;
 
-                    {
-                        Integer part = qry.partition();
+            Integer part = qry.partition();
 
-                        if (part == null || dht == null)
-                            iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
-                        else if (part < 0 || part >= cctx.affinity().partitions())
-                            iter = F.emptyIterator();
-                        else {
-                            locPart = dht.topology().localPartition(part, topVer, false);
-
-                            // double check for owning state
-                            if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
-                                locPart.state() != OWNING)
-                                throw new GridDhtUnreservedPartitionException(part,
-                                    cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
-
-                            iter = new Iterator<K>() {
-                                private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
+            if (part == null || cctx.isLocal())
+                keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
+            else if (part < 0 || part >= cctx.affinity().partitions())
+                keyIter = F.emptyIterator();
+            else {
+                final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
 
-                                @Override public boolean hasNext() {
-                                    return iter0.hasNext();
-                                }
+                locPart = dht.topology().localPartition(part, topVer, false);
 
-                                @Override public K next() {
-                                    KeyCacheObject key = iter0.next();
+                // double check for owning state
+                if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING)
+                    throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+                        "Partition can not be reserved");
 
-                                    return (K)cctx.unwrapBinaryIfNeeded(key, true);
-                                }
+                final GridDhtLocalPartition locPart0 = locPart;
 
-                                @Override public void remove() {
-                                    iter0.remove();
-                                }
-                            };
-                        }
+                keyIter = new Iterator<KeyCacheObject>() {
+                    private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
 
-                        advance();
+                    @Override public boolean hasNext() {
+                        return iter0.hasNext();
                     }
 
-                    @Override public boolean onHasNext() {
-                        return next != null;
+                    @Override public KeyCacheObject next() {
+                        return iter0.next();
                     }
 
-                    @Override public IgniteBiTuple<K, V> onNext() {
-                        if (next == null)
-                            throw new NoSuchElementException();
-
-                        IgniteBiTuple<K, V> next0 = next;
-
-                        advance();
-
-                        return next0;
+                    @Override public void remove() {
+                        iter0.remove();
                     }
+                };
+            }
 
-                    private void advance() {
-                        IgniteBiTuple<K, V> next0 = null;
-
-                        while (iter.hasNext()) {
-                            next0 = null;
-
-                            K key = iter.next();
-
-                            V val;
-
-                            try {
-                                GridCacheEntryEx entry = cache.peekEx(key);
-
-                                CacheObject cacheVal =
-                                    entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
-
-                                val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true);
-                            }
-                            catch (GridCacheEntryRemovedException e) {
-                                val = null;
-                            }
-                            catch (IgniteCheckedException e) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to peek value: " + e);
-
-                                val = null;
-                            }
-
-                            if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
-                                dht.sendTtlUpdateRequest(expiryPlc);
-
-                                expiryPlc = cctx.cache().expiryPolicy(plc);
-                            }
-
-                            if (val != null) {
-                                next0 = F.t(key, val);
-
-                                if (checkPredicate(next0))
-                                    break;
-                                else
-                                    next0 = null;
-                            }
-                        }
-
-                        next = next0 != null ?
-                            new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
-                            null;
-
-                        if (next == null)
-                            sendTtlUpdate();
-                    }
+            final GridDhtLocalPartition locPart0 = locPart;
 
+            final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
+                new PeekValueExpiryAwareIterator<K, V>(keyIter, plc, topVer, keyValFilter, qry.keepBinary()) {
                     @Override protected void onClose() {
-                        sendTtlUpdate();
+                        super.onClose();
 
-                        if (locPart != null)
-                            locPart.release();
-                    }
-
-                    private void sendTtlUpdate() {
-                        if (dht != null && expiryPlc != null) {
-                            dht.sendTtlUpdateRequest(expiryPlc);
-
-                            expiryPlc = null;
-                        }
-                    }
-
-                    private boolean checkPredicate(Map.Entry<K, V> e) {
-                        if (keyValFilter != null) {
-                            Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, qry.keepBinary());
-
-                            return keyValFilter.apply(e0.getKey(), e0.getValue());
-                        }
-
-                        return true;
+                        if (locPart0 != null)
+                            locPart0.release();
                     }
                 };
 
@@ -975,10 +881,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 iters.add(heapIt);
 
                 if (cctx.isOffHeapEnabled())
-                    iters.add(offheapIterator(qry, backups));
+                    iters.add(offheapIterator(qry, topVer, backups, plc));
 
                 if (cctx.swap().swapEnabled())
-                    iters.add(swapIterator(qry, backups));
+                    iters.add(swapIterator(qry, topVer, backups, plc));
 
                 it = new CompoundIterator<>(iters);
             }
@@ -1032,8 +938,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Swap iterator.
      * @throws IgniteCheckedException If failed.
      */
-    private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups)
-        throws IgniteCheckedException {
+    private GridIterator<IgniteBiTuple<K, V>> swapIterator(
+        GridCacheQueryAdapter<?> qry,
+        AffinityTopologyVersion topVer,
+        boolean backups,
+        ExpiryPolicy expPlc
+    ) throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
         Integer part = qry.partition();
@@ -1041,6 +951,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
             cctx.swap().rawSwapIterator(part);
 
+        if (expPlc != null)
+            return scanExpiryIterator(
+                it,
+                topVer,
+                filter,
+                expPlc,
+                qry.keepBinary());
+
         return scanIterator(it, filter, qry.keepBinary());
     }
 
@@ -1049,9 +967,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param backups Include backups.
      * @return Offheap iterator.
      */
-    private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) {
+    private GridIterator<IgniteBiTuple<K, V>> offheapIterator(
+        GridCacheQueryAdapter<?> qry,
+        AffinityTopologyVersion topVer,
+        boolean backups,
+        ExpiryPolicy expPlc
+    ) {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
+        if (expPlc != null) {
+            return scanExpiryIterator(
+                cctx.swap().rawOffHeapIterator(qry.partition(), true, backups),
+                topVer,
+                filter,
+                expPlc,
+                qry.keepBinary());
+        }
+
         if (cctx.offheapTiered() && filter != null) {
             OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary());
 
@@ -1125,6 +1057,38 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         };
     }
 
+    private GridIteratorAdapter<IgniteBiTuple<K, V>> scanExpiryIterator(
+        final Iterator<Map.Entry<byte[], byte[]>> it,
+        AffinityTopologyVersion topVer,
+        @Nullable final IgniteBiPredicate<K, V> filter,
+        ExpiryPolicy expPlc,
+        final boolean keepBinary
+    ) {
+        Iterator <KeyCacheObject> keyIter = new Iterator<KeyCacheObject>() {
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            /** {@inheritDoc} */
+            @Override public KeyCacheObject next() {
+                try {
+                    return cctx.toCacheKeyObject(it.next().getKey());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                it.remove();
+            }
+        };
+
+        return new PeekValueExpiryAwareIterator<>(keyIter, expPlc, topVer, filter, keepBinary);
+    }
+
     /**
      * @param o Object to inject resources to.
      * @throws IgniteCheckedException If failure occurred while injecting resources.
@@ -3115,4 +3079,145 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             false,
             keepBinary);
     }
+
+    private class PeekValueExpiryAwareIterator<K, V> extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+        /** */
+        private final ExpiryPolicy plc;
+
+        /** */
+        private final GridCacheAdapter cache;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final GridDhtCacheAdapter dht;
+
+        /** */
+        private final IgniteBiPredicate<K, V> keyValFilter;
+
+        /** */
+        private final boolean keepBinary;
+
+        /** */
+        private IgniteBiTuple<K, V> next;
+
+        /** */
+        private IgniteCacheExpiryPolicy expiryPlc;
+
+        /** */
+        private Iterator<KeyCacheObject> keyIt;
+
+        public PeekValueExpiryAwareIterator(
+            Iterator<KeyCacheObject> keyIt,
+            ExpiryPolicy plc,
+            AffinityTopologyVersion topVer,
+            IgniteBiPredicate<K, V> keyValFilter,
+            boolean keepBinary
+        ) {
+            this.keyIt = keyIt;
+            this.plc = plc;
+            this.topVer = topVer;
+            this.keyValFilter = keyValFilter;
+
+            dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+            cache = dht != null ? dht : cctx.cache();
+
+            this.keepBinary = keepBinary;
+            expiryPlc = cctx.cache().expiryPolicy(plc);
+
+            advance();
+        }
+
+        @Override public boolean onHasNext() {
+            return next != null;
+        }
+
+        @Override public IgniteBiTuple<K, V> onNext() {
+            if (next == null)
+                throw new NoSuchElementException();
+
+            IgniteBiTuple<K, V> next0 = next;
+
+            advance();
+
+            return next0;
+        }
+
+        private void advance() {
+            IgniteBiTuple<K, V> next0 = null;
+
+            while (keyIt.hasNext()) {
+                next0 = null;
+
+                KeyCacheObject key = keyIt.next();
+
+                V val;
+
+                try {
+                    GridCacheEntryEx entry = cache.peekEx(key);
+
+                    CacheObject cacheVal =
+                        entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+
+                    val = (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheVal, true);
+                }
+                catch (GridCacheEntryRemovedException e) {
+                    val = null;
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to peek value: " + e);
+
+                    val = null;
+                }
+
+                if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) {
+                    dht.sendTtlUpdateRequest(expiryPlc);
+
+                    expiryPlc = cctx.cache().expiryPolicy(plc);
+                }
+
+                if (val != null) {
+                    next0 = F.t(
+                        (K)cctx.unwrapBinaryIfNeeded(key, keepBinary),
+                        (V)cctx.unwrapBinaryIfNeeded(val, keepBinary));
+
+                    if (checkPredicate(next0))
+                        break;
+                    else
+                        next0 = null;
+                }
+            }
+
+            next = next0 != null ?
+                new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
+                null;
+
+            if (next == null)
+                sendTtlUpdate();
+        }
+
+        @Override protected void onClose() {
+            sendTtlUpdate();
+        }
+
+        private void sendTtlUpdate() {
+            if (dht != null && expiryPlc != null) {
+                dht.sendTtlUpdateRequest(expiryPlc);
+
+                expiryPlc = null;
+            }
+        }
+
+        private boolean checkPredicate(Map.Entry<K, V> e) {
+            if (keyValFilter != null) {
+                Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary);
+
+                return keyValFilter.apply(e0.getKey(), e0.getValue());
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
index c2ee607..7ff3f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
@@ -54,4 +54,9 @@ public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAb
     @Override protected NearCacheConfiguration nearConfiguration() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public void testAccess() throws Exception {
+        super.testAccess();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
new file mode 100644
index 0000000..f218c14
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicOffheapExpiryPolicyTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.cache.CacheMemoryMode;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicOffheapExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.OFFHEAP_TIERED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testAccess() throws Exception {
+        super.testAccess();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1adc02a5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 7d22206..f57d860 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -33,21 +33,30 @@ import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.EternalExpiryPolicy;
 import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.expiry.ModifiedExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -95,7 +104,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         storeMap.clear();
     }
 
-
     /** {@inheritDoc} */
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
         CacheConfiguration cfg = super.cacheConfiguration(gridName);
@@ -105,6 +113,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         cfg.setExpiryPolicyFactory(factory);
 
+        cfg.setMemoryMode(memoryMode());
+
+        if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED)
+            cfg.setOffHeapMaxMemory(0);
+
         if (disableEagerTtl)
             cfg.setEagerTtl(false);
 
@@ -112,7 +125,44 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     }
 
     /**
-     * @throws Exception If failed.
+     * @return Cache memory mode.
+     */
+    protected CacheMemoryMode memoryMode() {
+        return CacheMemoryMode.ONHEAP_TIERED;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testCreateUpdate0() throws Exception {
+        startGrids(1);
+
+        long ttl = 60L;
+
+        final String key = "key1";
+
+        final IgniteCache<String, String> cache = jcache();
+
+        for (int i = 0; i < 1000; i++) {
+            final IgniteCache<String, String> cache0 = cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(TimeUnit.HOURS, ttl)));
+
+            cache0.put(key, key);
+
+            info("PUT DONE");
+        }
+
+        int pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+        assertTrue("Too many pending entries: " + pSize, pSize <= 1);
+
+        cache.remove(key);
+
+        pSize = grid(0).context().cache().internalCache(null).context().ttl().pendingSize();
+
+        assertEquals(0, pSize);
+    }
+
+    /**     * @throws Exception If failed.
      */
     public void testZeroOnCreate() throws Exception {
         factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO);
@@ -307,48 +357,50 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         startGrids();
 
-        for (final Integer key : keys()) {
-            log.info("Test access [key=" + key + ']');
-
-            access(key);
-        }
-
-        accessGetAll();
-
-        for (final Integer key : keys()) {
-            log.info("Test filterAccessRemove access [key=" + key + ']');
-
-            filterAccessRemove(key);
-        }
-
-        for (final Integer key : keys()) {
-            log.info("Test filterAccessReplace access [key=" + key + ']');
-
-            filterAccessReplace(key);
-        }
-
-        if (atomicityMode() == TRANSACTIONAL) {
-            TransactionConcurrency[] txModes = {PESSIMISTIC};
-
-            for (TransactionConcurrency txMode : txModes) {
-                for (final Integer key : keys()) {
-                    log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
-
-                    txGet(key, txMode);
-                }
-            }
-
-            for (TransactionConcurrency txMode : txModes) {
-                log.info("Test txGetAll [txMode=" + txMode + ']');
-
-                txGetAll(txMode);
-            }
-        }
+//        for (final Integer key : keys()) {
+//            log.info("Test access [key=" + key + ']');
+//
+//            access(key);
+//        }
+//
+//        accessGetAll();
+//
+//        for (final Integer key : keys()) {
+//            log.info("Test filterAccessRemove access [key=" + key + ']');
+//
+//            filterAccessRemove(key);
+//        }
+//
+//        for (final Integer key : keys()) {
+//            log.info("Test filterAccessReplace access [key=" + key + ']');
+//
+//            filterAccessReplace(key);
+//        }
+//
+//        if (atomicityMode() == TRANSACTIONAL) {
+//            TransactionConcurrency[] txModes = {PESSIMISTIC};
+//
+//            for (TransactionConcurrency txMode : txModes) {
+//                for (final Integer key : keys()) {
+//                    log.info("Test txGet [key=" + key + ", txMode=" + txMode + ']');
+//
+//                    txGet(key, txMode);
+//                }
+//            }
+//
+//            for (TransactionConcurrency txMode : txModes) {
+//                log.info("Test txGetAll [txMode=" + txMode + ']');
+//
+//                txGetAll(txMode);
+//            }
+//        }
 
         IgniteCache<Integer, Integer> cache = jcache(0);
 
         Collection<Integer> putKeys = keys();
 
+        info("Put keys: " + putKeys);
+
         for (final Integer key : putKeys)
             cache.put(key, key);
 
@@ -359,10 +411,15 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         while (it.hasNext())
             itKeys.add(it.next().getKey());
 
+        info("It keys: " + itKeys);
+
         assertTrue(itKeys.size() >= putKeys.size());
 
-        for (Integer key : itKeys)
+        for (Integer key : itKeys) {
+            info("Checking iterator key: " + key);
+
             checkTtl(key, 62_000L, true);
+        }
     }
 
     /**
@@ -1016,7 +1073,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
             ClusterNode node = grid(i).cluster().localNode();
 
             for (Integer key : keys) {
-                Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP);
+                Object val = jcache(i).localPeek(key, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP);
 
                 if (val != null) {
                     log.info("Unexpected value [grid=" + i +
@@ -1059,51 +1116,53 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
             GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
 
-            GridCacheEntryEx e = cache.peekEx(key);
+            if (cache.context().isNear())
+                cache = cache.context().near().dht();
 
-            if (e == null && cache.context().isNear())
-                e = cache.context().near().dht().peekEx(key);
+            while (true) {
+                try {
+                    GridCacheEntryEx e = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+                        cache.peekEx(key) : cache.entryEx(key);
 
-            if (e != null && e.deleted()) {
-                assertEquals(0, e.ttl());
+                    if (e != null && e.deleted()) {
+                        assertEquals(0, e.ttl());
 
-                assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+                        assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
 
-                continue;
-            }
+                        continue;
+                    }
 
-            if (e == null)
-                assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
-            else {
-                found = true;
+                    if (e == null)
+                        assertTrue("Not found " + key, !cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+                    else {
+                        e.unswap();
 
-                if (wait) {
-                    final GridCacheEntryEx e0 = e;
+                        found = true;
 
-                    GridTestUtils.waitForCondition(new PA() {
-                        @Override public boolean apply() {
-                            try {
-                                return e0.ttl() == ttl;
-                            }
-                            catch (Exception e) {
-                                fail("Unexpected error: " + e);
+                        if (wait)
+                            waitTtl(cache, key, ttl);
 
-                                return true;
-                            }
-                        }
-                    }, 3000);
-                }
+                        boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
+                        boolean backup = cache.affinity().isBackup(grid.localNode(), key);
 
-                boolean primary = cache.affinity().isPrimary(grid.localNode(), key);
-                boolean backup = cache.affinity().isBackup(grid.localNode(), key);
+                        assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
+                                ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
 
-                assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
-                    ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
+                        if (ttl > 0)
+                            assertTrue(e.expireTime() > 0);
+                        else
+                            assertEquals(0, e.expireTime());
+                    }
 
-                if (ttl > 0)
-                    assertTrue(e.expireTime() > 0);
-                else
-                    assertEquals(0, e.expireTime());
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    // Retry.
+                }
+                catch (GridDhtInvalidPartitionException ignore) {
+                    // No need to check.
+                    break;
+                }
             }
         }
 
@@ -1111,6 +1170,40 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     }
 
     /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param ttl TTL to wait.
+     * @throws IgniteInterruptedCheckedException If wait has been interrupted.
+     */
+    private void waitTtl(final GridCacheAdapter<Object, Object> cache, final Object key, final long ttl)
+        throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(new PAX() {
+            @Override public boolean applyx() throws IgniteCheckedException {
+                GridCacheEntryEx entry = null;
+
+                while (true) {
+                    try {
+                        entry = memoryMode() == CacheMemoryMode.ONHEAP_TIERED ?
+                                cache.peekEx(key) : cache.entryEx(key);
+
+                        assert entry != null;
+
+                        entry.unswap();
+
+                        return entry.ttl() == ttl;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        // Retry.
+                    }
+                    catch (GridDhtInvalidPartitionException ignore) {
+                        return true;
+                    }
+                }
+            }
+        }, 3000);
+    }
+
+    /**
      *
      */
     private static class GetEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
@@ -1153,6 +1246,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
 
         /** {@inheritDoc} */
         @Override public Duration getExpiryForAccess() {
+            U.dumpStack();
+
             return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
         }