You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/12 14:44:57 UTC

[2/4] ignite git commit: ignite-4932 WIP

ignite-4932 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59c9707c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59c9707c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59c9707c

Branch: refs/heads/ignite-4932
Commit: 59c9707ccef73d2cce5ba7171225be995c247276
Parents: f9f4256
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 16:00:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:00:40 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 152 ++++++++----
 .../processors/cache/GridCacheContext.java      |   9 +
 .../processors/cache/GridCacheEventManager.java |  24 ++
 .../processors/cache/GridCacheMapEntry.java     |  45 ++--
 .../processors/cache/GridCacheSwapManager.java  |  22 ++
 .../dht/GridPartitionedGetFuture.java           | 152 +++++++-----
 .../dht/GridPartitionedSingleGetFuture.java     | 139 +++++++----
 .../dht/atomic/GridDhtAtomicCache.java          | 230 ++++++++++++-------
 .../dht/colocated/GridDhtColocatedCache.java    | 228 ++++++++++--------
 .../local/atomic/GridLocalAtomicCache.java      | 197 ++++++++++------
 .../cache/IgniteCacheNoSyncForGetTest.java      |  77 ++++++-
 11 files changed, 832 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 27a5750..5042f77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1908,80 +1908,130 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 Map<KeyCacheObject, EntryGetResult> misses = null;
 
+                boolean offheapRead = ctx.offheapRead(expiry, readerArgs != null);
+
                 for (KeyCacheObject key : keys) {
                     while (true) {
-                        GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
-
-                        if (entry == null) {
-                            if (!skipVals && ctx.config().isStatisticsEnabled())
-                                ctx.cache().metrics0().onRead(false);
-
-                            break;
-                        }
-
                         try {
-                            EntryGetResult res;
+                            EntryGetResult res = null;
 
                             boolean evt = !skipVals;
                             boolean updateMetrics = !skipVals;
 
-                            if (storeEnabled) {
-                                res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
-                                    updateMetrics,
-                                    evt,
-                                    subjId,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary,
-                                    readerArgs);
+                            GridCacheEntryEx entry = null;
+
+                            boolean skipEntry;
+
+                            if (offheapRead) {
+                                GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
 
-                                assert res != null;
+                                if (swapEntry != null) {
+                                    skipEntry = true;
 
-                                if (res.value() == null) {
-                                    if (misses == null)
-                                        misses = new HashMap<>();
+                                    long expireTime = swapEntry.expireTime();
 
-                                    misses.put(key, res);
+                                    if (expireTime != 0) {
+                                        if (expireTime - U.currentTimeMillis() > 0) {
+                                            res = new EntryGetWithTtlResult(swapEntry.value(),
+                                                swapEntry.version(),
+                                                false,
+                                                expireTime,
+                                                swapEntry.ttl());
+                                        }
+                                        else
+                                            skipEntry = false; // Do not skip entry if need process expiration.
+                                    }
+                                    else
+                                        res = new EntryGetResult(swapEntry.value(), swapEntry.version(), false);
+                                }
+                                else
+                                    skipEntry = !storeEnabled;
+
+                                if (skipEntry) {
+                                    if (evt) {
+                                        ctx.events().readEvent(key,
+                                            null,
+                                            swapEntry != null ? swapEntry.value() : null,
+                                            subjId,
+                                            taskName,
+                                            !deserializeBinary);
+                                    }
 
-                                    res = null;
+                                    if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(swapEntry != null);
                                 }
                             }
-                            else {
-                                if (needVer || readerArgs != null) {
-                                    res = entry.innerGetVersioned(
-                                        null,
-                                        null,
-                                        ctx.isSwapOrOffheapEnabled(),
-                                       /*unmarshal*/true,
+                            else
+                                skipEntry = false;
+
+                            if (!skipEntry) {
+                                entry = needEntry ? entryEx(key) : peekEx(key);
+
+                                if (entry == null) {
+                                    if (!skipVals && ctx.config().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(false);
+
+                                    break;
+                                }
+
+                                if (storeEnabled) {
+                                    res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
                                         updateMetrics,
                                         evt,
                                         subjId,
-                                        null,
                                         taskName,
                                         expiry,
                                         !deserializeBinary,
                                         readerArgs);
+
+                                    assert res != null;
+
+                                    if (res.value() == null) {
+                                        if (misses == null)
+                                            misses = new HashMap<>();
+
+                                        misses.put(key, res);
+
+                                        res = null;
+                                    }
                                 }
                                 else {
-                                    CacheObject val = entry.innerGet(
-                                        null,
-                                        null,
-                                        ctx.isSwapOrOffheapEnabled(),
-                                        false,
-                                        updateMetrics,
-                                        evt,
-                                        false,
-                                        subjId,
-                                        null,
-                                        taskName,
-                                        expiry,
-                                        !deserializeBinary);
+                                    if (needVer || readerArgs != null) {
+                                        res = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            ctx.isSwapOrOffheapEnabled(),
+                                            /*unmarshal*/true,
+                                            updateMetrics,
+                                            evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary,
+                                            readerArgs);
+                                    }
+                                    else {
+                                        CacheObject val = entry.innerGet(
+                                            null,
+                                            null,
+                                            ctx.isSwapOrOffheapEnabled(),
+                                            false,
+                                            updateMetrics,
+                                            evt,
+                                            false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary);
+
+                                        res = val != null ? new EntryGetResult(val, null) : null;
+                                    }
 
-                                    res = val != null ? new EntryGetResult(val, null) : null;
+                                    if (res == null)
+                                        ctx.evicts().touch(entry, topVer);
                                 }
-
-                                if (res == null)
-                                    ctx.evicts().touch(entry, topVer);
                             }
 
                             if (res != null) {
@@ -1994,7 +2044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                     true,
                                     needVer);
 
-                                if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+                                if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
                                     ctx.evicts().touch(entry, topVer);
 
                                 if (keysSize == 1)

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b44b50..0985161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2058,6 +2058,15 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @param expiryPlc
+     * @param readers
+     * @return
+     */
+    public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+        return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+    }
+
+    /**
      * @param part Partition.
      * @param affNodes Affinity nodes.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 1c18738..8953b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -61,6 +62,29 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
         cctx.gridEvents().removeLocalEventListener(lsnr);
     }
 
+    public void readEvent(KeyCacheObject key,
+        IgniteInternalTx tx,
+        CacheObject val,
+        UUID subjId,
+        String taskName,
+        boolean keepBinary) {
+        if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+            addEvent(cctx.affinity().partition(key),
+                key,
+                tx,
+                null,
+                EVT_CACHE_OBJECT_READ,
+                val,
+                val != null,
+                val,
+                val != null,
+                subjId,
+                null,
+                taskName,
+                keepBinary);
+        }
+    }
+
     /**
      * @param part Partition.
      * @param key Key for the event.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 34f8b96..b9ebed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,31 +874,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Object res = null;
 
-        // TODO IGNITE-4932: metrics/events.
+        if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+            // Fast heap get without 'synchronized'.
+            CacheObject val0 = this.val;
 
-        if (readerArgs == null && expiryPlc == null) {
-            if (!retVer && cctx.config().isEagerTtl()) { // Fast heap get.
-                CacheObject val0 = this.val;
-
-                if (val0 != null)
-                    return val0;
-            }
+            if (val0 != null) {
+                if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
+                    cctx.cache().metrics0().onRead(true);
 
-            if (cctx.isSwapOrOffheapEnabled() && readSwap) {
-                GridCacheSwapEntry swapEntry = cctx.swap().read(this, false, true, true, false);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                    transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
 
-                if (swapEntry != null) {
-                    long expireTime = swapEntry.expireTime();
+                    GridCacheMvcc mvcc = mvccExtras();
 
-                    if (expireTime != 0) {
-                        if (expireTime - U.currentTimeMillis() > 0) {
-                            return retVer ? new EntryGetWithTtlResult(val, ver, false, expireTime, swapEntry.ttl()) :
-                                swapEntry.value();
-                        }
-                    }
-                    else
-                        return retVer ? new EntryGetResult(val, ver, false) : swapEntry.value();
+                    cctx.events().addEvent(
+                        partition(),
+                        key,
+                        tx,
+                        mvcc != null ? mvcc.anyOwner() : null,
+                        EVT_CACHE_OBJECT_READ,
+                        val0,
+                        true,
+                        val0,
+                        true,
+                        subjId,
+                        transformClo != null ? transformClo.getClass().getName() : null,
+                        taskName,
+                        keepBinary);
                 }
+
+                return val0;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 159b3b8..07edaff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -819,6 +819,28 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key.
+     * @return Read value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public GridCacheSwapEntry readSwapEntry(KeyCacheObject key) throws IgniteCheckedException {
+        assert offheapEnabled || swapEnabled;
+
+        GridCacheSwapEntry entry = read(key,
+            key.valueBytes(cctx.cacheObjectContext()),
+            cctx.affinity().partition(key),
+            false,
+            true,
+            true,
+            false);
+
+        assert entry == null || entry.value() != null : entry;
+        assert entry == null || entry.version() != null : entry;
+
+        return entry;
+    }
+
+    /**
      * @param entry Entry to read.
      * @return Read value address.
      * @throws IgniteCheckedException If read failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 519239a..798e2dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -437,81 +438,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
         GridDhtCacheAdapter<K, V> cache = cache();
 
-        while (true) {
-            GridCacheEntryEx entry;
+        boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+        boolean evt = !skipVals;
 
+        while (true) {
             try {
-                entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+                boolean skipEntry;
 
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
+                EntryGetResult getRes = null;
+                CacheObject v = null;
+                GridCacheVersion ver = null;
 
-                    EntryGetResult getRes = null;
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
+                if (offheapRead) {
+                    skipEntry = true;
 
-                    if (needVer) {
-                        getRes = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /*swap*/true,
-                            /*unmarshal*/true,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            !deserializeBinary,
-                            null);
+                    GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
+
+                    if (swapEntry != null) {
+                        long expireTime = swapEntry.expireTime();
+
+                        if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                            v = swapEntry.value();
 
-                        if (getRes != null) {
-                            v = getRes.value();
-                            ver = getRes.version();
+                            if (needVer)
+                                ver = swapEntry.version();
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
+
+                    if (skipEntry && evt) {
+                        cctx.events().readEvent(key,
                             null,
-                            /*swap*/true,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            /*temporary*/false,
+                            swapEntry != null ? swapEntry.value() : null,
                             subjId,
-                            null,
                             taskName,
-                            expiryPlc,
                             !deserializeBinary);
                     }
+                }
+                else
+                    skipEntry = false;
+
+                if (!skipEntry) {
+                    GridCacheEntryEx entry =
+                        cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            getRes = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary,
+                                null);
+
+                            if (getRes != null) {
+                                v = getRes.value();
+                                ver = getRes.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary);
+                        }
 
-                    cache.context().evicts().touch(entry, topVer);
+                        cache.context().evicts().touch(entry, topVer);
 
-                    // Entry was not in memory or in swap, so we remove it from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            cache.removeEntry(entry);
-                    }
-                    else {
-                        cctx.addResult(locVals,
-                            key,
-                            v,
-                            skipVals,
-                            keepCacheObjects,
-                            deserializeBinary,
-                            true,
-                            getRes,
-                            ver,
-                            0,
-                            0,
-                            needVer);
-
-                        return true;
+                        // Entry was not in memory or in swap, so we remove it from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                cache.removeEntry(entry);
+                        }
                     }
                 }
 
+                if (v != null) {
+                    cctx.addResult(locVals,
+                        key,
+                        v,
+                        skipVals,
+                        keepCacheObjects,
+                        deserializeBinary,
+                        true,
+                        getRes,
+                        ver,
+                        0,
+                        0,
+                        needVer);
+
+                    return true;
+                }
+
                 boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
 
                 // Entry not found, do not continue search if topology did not change and there is no store.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a3f6b72..11d4fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -360,74 +361,110 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
 
         GridDhtCacheAdapter colocated = cctx.dht();
 
-        while (true) {
-            GridCacheEntryEx entry;
+        boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+        boolean evt = !skipVals;
 
+        while (true) {
             try {
-                entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                    colocated.peekEx(key);
+                CacheObject v = null;
+                GridCacheVersion ver = null;
 
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
+                boolean skipEntry;
 
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
+                if (offheapRead) {
+                    skipEntry = true;
 
-                    if (needVer) {
-                        EntryGetResult res = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /*swap*/true,
-                            /*unmarshal*/true,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            true,
-                            null);
+                    GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
 
-                        if (res != null) {
-                            v = res.value();
-                            ver = res.version();
+                    if (swapEntry != null) {
+                        long expireTime = swapEntry.expireTime();
+
+                        if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                            v = swapEntry.value();
+
+                            if (needVer)
+                                ver = swapEntry.version();
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
+
+                    if (skipEntry && evt) {
+                        cctx.events().readEvent(key,
                             null,
-                            /*swap*/true,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            /*temporary*/false,
+                            swapEntry != null ? swapEntry.value() : null,
                             subjId,
-                            null,
                             taskName,
-                            expiryPlc,
-                            true);
+                            !deserializeBinary);
                     }
+                }
+                else
+                    skipEntry = false;
+
+                if (!skipEntry) {
+                    GridCacheEntryEx entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                        colocated.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            EntryGetResult res = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true,
+                                null);
+
+                            if (res != null) {
+                                v = res.value();
+                                ver = res.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true);
+                        }
 
-                    colocated.context().evicts().touch(entry, topVer);
+                        colocated.context().evicts().touch(entry, topVer);
 
-                    // Entry was not in memory or in swap, so we remove it from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            colocated.removeEntry(entry);
+                        if (v == null) {
+                            // Entry was not in memory or in swap, so we remove it from cache.
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                colocated.removeEntry(entry);
+                        }
                     }
-                    else {
-                        if (!skipVals && cctx.config().isStatisticsEnabled())
-                            cctx.cache().metrics0().onRead(true);
+                }
 
-                        if (!skipVals)
-                            setResult(v, ver);
-                        else
-                            setSkipValueResult(true, ver);
+                if (v != null) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onRead(true);
 
-                        return true;
-                    }
+                    if (!skipVals)
+                        setResult(v, ver);
+                    else
+                        setSkipValueResult(true, ver);
+
+                    return true;
                 }
 
                 boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8523366..c6bceef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -1565,114 +1566,165 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
+        boolean evt = !skipVals;
+
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!forcePrimary && ctx.affinityNode()) {
-            Map<K, V> locVals = U.newHashMap(keys.size());
-
-            boolean success = true;
-
-            // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
+            try {
+                Map<K, V> locVals = U.newHashMap(keys.size());
+
+                boolean success = true;
+                boolean offheapRead = ctx.offheapRead(expiry, false);
+
+                // Optimistically expect that all keys are available locally (avoid creation of get future).
+                for (KeyCacheObject key : keys) {
+                    if (offheapRead) {
+                        GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
+
+                        if (swapEntry != null) {
+                            long expireTime = swapEntry.expireTime();
+
+                            if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                                ctx.addResult(locVals,
+                                    key,
+                                    swapEntry.value(),
+                                    skipVals,
+                                    false,
+                                    deserializeBinary,
                                     true,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(null,
-                                    null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
                                     null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = context().versions().next();
-
-                                if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
+                                    swapEntry.version(),
+                                    0,
+                                    0,
+                                    needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        swapEntry.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
                             else
-                                ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
-                                    getRes, ver, 0, 0, needVer);
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+                                // If our DHT cache do has value, then we peek it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*unmarshal*/true,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            true,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(null,
+                                            null,
+                                            /*swap*/true,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            /*temporary*/false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so we remove it from cache.
+                                    if (v == null) {
+                                        if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            false,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            ctx.evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    ctx.evicts().touch(entry, topVer);
+                            }
+                        }
+
+                        if (!success)
+                            break;
+                        else if (!skipVals && ctx.config().isStatisticsEnabled())
+                            metrics0().onRead(true);
                     }
                 }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    metrics0().onRead(true);
-            }
+                if (success) {
+                    sendTtlUpdateRequest(expiry);
 
-            if (success) {
-                sendTtlUpdateRequest(expiry);
+                    return new GridFinishedFuture<>(locVals);
+                }
 
-                return new GridFinishedFuture<>(locVals);
+                if (expiry != null)
+                    expiry.reset();
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 
-        if (expiry != null)
-            expiry.reset();
-
         // Either reload or not all values are available locally.
         GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
             keys,

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c8556e5..4b1dd9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -451,121 +452,162 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!forcePrimary) {
-            Map<K, V> locVals = null;
+            try {
+                Map<K, V> locVals = null;
 
-            boolean success = true;
+                boolean success = true;
+                boolean offheapRead = ctx.offheapRead(expiryPlc, false);
+                boolean evt = !skipVals;
 
-            // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = context().versions().next();
+                // Optimistically expect that all keys are available locally (avoid creation of get future).
+                for (KeyCacheObject key : keys) {
+                    if (offheapRead) {
+                        GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
 
-                                if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
-                            }
-                            else {
-                                if (locVals == null)
-                                    locVals = U.newHashMap(keys.size());
+                        if (swapEntry != null) {
+                            long expireTime = swapEntry.expireTime();
 
+                            if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
                                 ctx.addResult(locVals,
                                     key,
-                                    v,
+                                    swapEntry.value(),
                                     skipVals,
-                                    keepCacheObj,
+                                    false,
                                     deserializeBinary,
                                     true,
-                                    getRes,
-                                    ver,
+                                    null,
+                                    swapEntry.version(),
                                     0,
                                     0,
                                     needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        swapEntry.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
+                            else
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+                                // If our DHT cache do has value, then we peek it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*unmarshal*/true,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            /*temporary*/false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so we remove it from cache.
+                                    if (v == null) {
+                                        GridCacheVersion obsoleteVer = context().versions().next();
+
+                                        if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        if (locVals == null)
+                                            locVals = U.newHashMap(keys.size());
+
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            keepCacheObj,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            context().evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    context().evicts().touch(entry, topVer);
+                            }
+                        }
                     }
-                }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    ctx.cache().metrics0().onRead(true);
-            }
+                    if (!success)
+                        break;
+                    else if (!skipVals && ctx.config().isStatisticsEnabled())
+                        ctx.cache().metrics0().onRead(true);
+                }
 
-            if (success) {
-                sendTtlUpdateRequest(expiryPlc);
+                if (success) {
+                    sendTtlUpdateRequest(expiryPlc);
 
-                return new GridFinishedFuture<>(locVals);
+                    return new GridFinishedFuture<>(locVals);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index f86df2f..1f66fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCachePreloader;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -397,97 +398,149 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
         final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
 
         boolean success = true;
+        final boolean offheapRead = ctx.offheapRead(expiry, false);
+        final boolean evt = !skipVals;
 
         for (K key : keys) {
             if (key == null)
                 throw new NullPointerException("Null key.");
 
-            GridCacheEntryEx entry = null;
-
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            while (true) {
-                try {
-                    entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
+            boolean skipEntry;
 
-                    if (entry != null) {
-                        CacheObject v;
+            if (offheapRead) {
+                skipEntry = true;
 
-                        if (needVer) {
-                            EntryGetResult res = entry.innerGetVersioned(
-                                null,
-                                null,
-                                /*swap*/swapOrOffheap,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiry,
-                                !deserializeBinary,
-                                null);
-
-                            if (res != null) {
-                                ctx.addResult(
-                                    vals,
-                                    cacheKey,
-                                    res,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
-                                    needVer);
-                            }
-                            else
-                                success = false;
-                        }
-                        else {
-                            v = entry.innerGet(
-                                null,
+                GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(cacheKey);
+
+                if (swapEntry != null) {
+                    long expireTime = swapEntry.expireTime();
+
+                    if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                        ctx.addResult(vals,
+                            cacheKey,
+                            swapEntry.value(),
+                            skipVals,
+                            false,
+                            deserializeBinary,
+                            true,
+                            null,
+                            swapEntry.version(),
+                            0,
+                            0,
+                            needVer);
+
+                        if (configuration().isStatisticsEnabled() && !skipVals)
+                            metrics0().onRead(true);
+
+                        if (evt) {
+                            ctx.events().readEvent(cacheKey,
                                 null,
-                                /*swap*/swapOrOffheap,
-                                /*read-through*/false,
-                                /**update-metrics*/true,
-                                /**event*/!skipVals,
-                                /**temporary*/false,
+                                swapEntry.value(),
                                 subjId,
-                                null,
                                 taskName,
-                                expiry,
                                 !deserializeBinary);
+                        }
+                    }
+                    else
+                        skipEntry = false;
+                }
+                else
+                    success = false;
+
+                if (skipEntry && !success && !storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+                    metrics0().onRead(false);
+            }
+            else
+                skipEntry = false;
+
+            if (!skipEntry) {
+                GridCacheEntryEx entry = null;
+
+                CacheObject v;
+
+                while (true) {
+                    try {
+                        entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
 
-                            if (v != null) {
-                                ctx.addResult(vals,
-                                    cacheKey,
-                                    v,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
+                        if (entry != null) {
+                            if (needVer) {
+                                EntryGetResult res = entry.innerGetVersioned(
+                                    null,
+                                    null,
+                                    /*swap*/swapOrOffheap,
+                                    /*unmarshal*/true,
+                                    /*update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    subjId,
                                     null,
-                                    0,
-                                    0);
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary,
+                                    null);
+
+                                if (res != null) {
+                                    ctx.addResult(
+                                        vals,
+                                        cacheKey,
+                                        res,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        needVer);
+                                }
+                                else
+                                    success = false;
+                            }
+                            else {
+                                v = entry.innerGet(
+                                    null,
+                                    null,
+                                    /*swap*/swapOrOffheap,
+                                    /*read-through*/false,
+                                    /*update-metrics*/true,
+                                    /*event*/!skipVals,
+                                    /*temporary*/false,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary);
+
+                                if (v != null) {
+                                    ctx.addResult(vals,
+                                        cacheKey,
+                                        v,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        null,
+                                        0,
+                                        0);
+                                }
+                                else
+                                    success = false;
                             }
-                            else
-                                success = false;
                         }
-                    }
-                    else {
-                        if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
-                            metrics0().onRead(false);
+                        else {
+                            if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+                                metrics0().onRead(false);
 
-                        success = false;
-                    }
+                            success = false;
+                        }
 
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, retry.
-                }
-                finally {
-                    if (entry != null)
-                        ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        // No-op, retry.
+                    }
+                    finally {
+                        if (entry != null)
+                            ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                    }
                 }
 
                 if (!success && storeEnabled)

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index faa63b3..0dbfc7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +30,6 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -93,37 +95,51 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAtomicGet() throws Exception {
-        doGet(ATOMIC, ONHEAP_TIERED);
+        doGet(ATOMIC, ONHEAP_TIERED, false);
+
+        doGet(ATOMIC, ONHEAP_TIERED, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicGetOffheap() throws Exception {
-        doGet(ATOMIC, OFFHEAP_TIERED);
+        doGet(ATOMIC, OFFHEAP_TIERED, false);
+
+        doGet(ATOMIC, OFFHEAP_TIERED, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void doGet(CacheAtomicityMode atomicityMode, CacheMemoryMode memoryMode) throws Exception {
+    private void doGet(CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        final boolean getAll) throws Exception {
         Ignite srv = ignite(0);
 
         Ignite client = ignite(1);
 
         final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, memoryMode));
 
+        final Map<Object, Object> data = new HashMap<>();
+
+        data.put(1, 1);
+        data.put(2, 2);
+
         try {
             // Get from compute closure.
             {
-                cache.put(1, 1);
+                cache.putAll(data);
 
                 hangLatch = new CountDownLatch(1);
                 processorStartLatch = new CountDownLatch(1);
 
                 IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
-                        cache.invoke(1, new HangEntryProcessor());
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
 
                         return null;
                     }
@@ -134,7 +150,14 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
                     assertTrue(wait);
 
-                    assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName())));
+                    if (getAll) {
+                        assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+                            new GetAllClosure(data.keySet(), cache.getName())));
+                    }
+                    else {
+                        assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+                            new GetClosure(1, cache.getName())));
+                    }
 
                     hangLatch.countDown();
 
@@ -147,14 +170,17 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
             // Local get.
             {
-                cache.put(1, 1);
+                cache.putAll(data);
 
                 hangLatch = new CountDownLatch(1);
                 processorStartLatch = new CountDownLatch(1);
 
                 IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
-                        cache.invoke(1, new HangEntryProcessor());
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
 
                         return null;
                     }
@@ -165,7 +191,10 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
                     assertTrue(wait);
 
-                    assertEquals(1, srv.cache(cache.getName()).get(1));
+                    if (getAll)
+                        assertEquals(data, srv.cache(cache.getName()).getAll(data.keySet()));
+                    else
+                        assertEquals(1, srv.cache(cache.getName()).get(1));
 
                     hangLatch.countDown();
 
@@ -249,4 +278,32 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
             return ignite.cache(cacheName).get(key);
         }
     }
+
+    /**
+     *
+     */
+    public static class GetAllClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final Set<Object> keys;
+
+        /** */
+        private final String cacheName;
+
+        /**
+         * @param keys Keys.
+         */
+        public GetAllClosure(Set<Object> keys, String cacheName) {
+            this.keys = keys;
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return ignite.cache(cacheName).getAll(keys);
+        }
+    }
 }