You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/12 14:44:56 UTC

[1/4] ignite git commit: IGNITE-4907: Fixed excessive service instances can be started with dynamic deployment. This closes #1766.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4932 f9f4256ae -> f2d9ea0cc


IGNITE-4907: Fixed excessive service instances can be started with dynamic deployment. This closes #1766.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f7ef742
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f7ef742
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f7ef742

Branch: refs/heads/ignite-4932
Commit: 0f7ef74216fab64f5d1d2b6d432b552b7fe40d2f
Parents: 01ceeb1
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Wed Apr 12 13:01:25 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Apr 12 13:01:25 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           |  2 +-
 ...ServiceProcessorMultiNodeConfigSelfTest.java | 95 +++++++++++++++++---
 .../GridServiceProcessorMultiNodeSelfTest.java  | 61 +++++++++++++
 3 files changed, 146 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d0b2733..37bffc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -972,7 +972,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         int perNodeCnt = totalCnt != 0 ? totalCnt / size : maxPerNodeCnt;
                         int remainder = totalCnt != 0 ? totalCnt % size : 0;
 
-                        if (perNodeCnt > maxPerNodeCnt && maxPerNodeCnt != 0) {
+                        if (perNodeCnt >= maxPerNodeCnt && maxPerNodeCnt != 0) {
                             perNodeCnt = maxPerNodeCnt;
                             remainder = 0;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
index 1bd3b03..9da62c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
-import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -38,6 +40,9 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
     /** Node singleton name. */
     private static final String NODE_SINGLE_BUT_CLIENT = "serviceConfigEachNodeButClient";
 
+    /** Node singleton name. */
+    private static final String NODE_SINGLE_WITH_LIMIT = "serviceConfigWithLimit";
+
     /** Affinity service name. */
     private static final String AFFINITY = "serviceConfigAffinity";
 
@@ -51,7 +56,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
 
     /** {@inheritDoc} */
     @Override protected ServiceConfiguration[] services() {
-        ServiceConfiguration[] arr = new ServiceConfiguration[4];
+        List<ServiceConfiguration> cfgs = new ArrayList<>();
 
         ServiceConfiguration cfg = new ServiceConfiguration();
 
@@ -60,7 +65,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         cfg.setTotalCount(1);
         cfg.setService(new DummyService());
 
-        arr[0] = cfg;
+        cfgs.add(cfg);
 
         cfg = new ServiceConfiguration();
 
@@ -68,7 +73,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         cfg.setMaxPerNodeCount(1);
         cfg.setService(new DummyService());
 
-        arr[1] = cfg;
+        cfgs.add(cfg);
 
         cfg = new ServiceConfiguration();
 
@@ -79,7 +84,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         cfg.setTotalCount(1);
         cfg.setService(new AffinityService(AFFINITY_KEY));
 
-        arr[2] = cfg;
+        cfgs.add(cfg);
 
         cfg = new ServiceConfiguration();
 
@@ -88,9 +93,18 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         cfg.setNodeFilter(new CacheConfiguration.IgniteAllNodesPredicate());
         cfg.setService(new DummyService());
 
-        arr[3] = cfg;
+        cfgs.add(cfg);
+
+        cfg = new ServiceConfiguration();
 
-        return arr;
+        cfg.setName(NODE_SINGLE_WITH_LIMIT);
+        cfg.setMaxPerNodeCount(1);
+        cfg.setTotalCount(nodeCount() + 1);
+        cfg.setService(new DummyService());
+
+        cfgs.add(cfg);
+
+        return cfgs.toArray(new ServiceConfiguration[cfgs.size()]);
     }
 
     /** {@inheritDoc} */
@@ -107,6 +121,8 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
                         DummyService.cancelled(NODE_SINGLE) == 0 &&
                         DummyService.started(NODE_SINGLE_BUT_CLIENT) == nodeCount() &&
                         DummyService.cancelled(NODE_SINGLE_BUT_CLIENT) == 0 &&
+                        DummyService.started(NODE_SINGLE_WITH_LIMIT) >= nodeCount() &&
+                        DummyService.cancelled(NODE_SINGLE_WITH_LIMIT) == 0 &&
                         actualCount(AFFINITY, randomGrid().services().serviceDescriptors()) == 1;
                 }
             },
@@ -170,6 +186,59 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         finally {
             stopExtraNodes(nodeCnt);
         }
+
+        checkCount(AFFINITY, g.services().serviceDescriptors(), 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployLimits() throws Exception {
+        final Ignite g = randomGrid();
+
+        final String name = NODE_SINGLE_WITH_LIMIT;
+
+        waitForDeployment(name, nodeCount());
+
+        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+
+        int extraNodes = 2;
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DummyService.exeLatch(name, latch);
+
+        startExtraNodes(extraNodes);
+
+        try {
+            latch.await();
+
+            checkCount(name, g.services().serviceDescriptors(), nodeCount() + 1);
+        }
+        finally {
+            stopExtraNodes(extraNodes);
+        }
+
+        assertEquals(name, 1, DummyService.cancelled(name));
+
+        waitForDeployment(name, nodeCount());
+
+        checkCount(name, g.services().serviceDescriptors(), nodeCount());
+    }
+
+    /**
+     * @param srvcName Service name
+     * @param expectedDeps Expected number of service deployments
+     *
+     */
+    private boolean waitForDeployment(final String srvcName, final int expectedDeps) throws IgniteInterruptedCheckedException {
+        final Ignite g = randomGrid();
+
+        return GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+            @Override public boolean applyx() {
+                return actualCount(srvcName, g.services().serviceDescriptors())  == expectedDeps;
+            }
+        }, 1500);
     }
 
     /**
@@ -212,10 +281,6 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         try {
             latch.await();
 
-            // Ensure service is deployed.
-            assertNotNull(grid(nodeCount() + newNodes - 1).services()
-                .serviceProxy(NODE_SINGLE_BUT_CLIENT, Service.class, false, 2000));
-
             assertEquals(name, newNodes, DummyService.started(name));
             assertEquals(name, 0, DummyService.cancelled(name));
 
@@ -224,6 +289,10 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         finally {
             stopExtraNodes(newNodes);
         }
+
+        waitForDeployment(name, nodeCount());
+
+        checkCount(name, g.services().serviceDescriptors(), nodeCount());
     }
 
     /**
@@ -253,5 +322,9 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
         finally {
             stopExtraNodes(servers + clients);
         }
+
+        waitForDeployment(name, nodeCount());
+
+        checkCount(name, g.services().serviceDescriptors(), nodeCount());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
index f7403dc..d133cf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
@@ -264,4 +264,65 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
             stopGrid("client");
         }
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployLimits() throws Exception {
+            String name = "serviceWithLimitsUpdateTopology";
+
+            Ignite g = randomGrid();
+
+            final int totalInstances = nodeCount() + 1;
+
+            CountDownLatch latch = new CountDownLatch(nodeCount());
+
+            DummyService.exeLatch(name, latch);
+
+            ServiceConfiguration srvcCfg = new ServiceConfiguration();
+
+            srvcCfg.setName(name);
+            srvcCfg.setMaxPerNodeCount(1);
+            srvcCfg.setTotalCount(totalInstances);
+            srvcCfg.setService(new DummyService());
+
+            IgniteServices svcs = g.services().withAsync();
+
+            svcs.deploy(srvcCfg);
+
+            IgniteFuture<?> fut = svcs.future();
+
+            info("Deployed service: " + name);
+
+            fut.get();
+
+            info("Finished waiting for service future: " + name);
+
+            latch.await();
+
+            TestCase.assertEquals(name, nodeCount(), DummyService.started(name));
+            TestCase.assertEquals(name, 0, DummyService.cancelled(name));
+
+            checkCount(name, g.services().serviceDescriptors(), nodeCount());
+
+            int extraNodes = 2;
+
+            latch = new CountDownLatch(1);
+
+            DummyService.exeLatch(name, latch);
+
+            startExtraNodes(2);
+
+            try {
+                latch.await();
+
+                TestCase.assertEquals(name, totalInstances, DummyService.started(name));
+                TestCase.assertEquals(name, 0, DummyService.cancelled(name));
+
+                checkCount(name, g.services().serviceDescriptors(), totalInstances);
+            }
+            finally {
+                stopExtraNodes(extraNodes);
+            }
+    }
 }
\ No newline at end of file


[3/4] ignite git commit: Merge remote-tracking branch 'remotes/community/ignite-1.9.2' into ignite-4932

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-1.9.2' into ignite-4932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d1d9752
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d1d9752
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d1d9752

Branch: refs/heads/ignite-4932
Commit: 7d1d9752a5415a87c93526b99b3f3ed12860cc12
Parents: 59c9707 0f7ef74
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 17:11:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:11:14 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           |  2 +-
 ...ServiceProcessorMultiNodeConfigSelfTest.java | 95 +++++++++++++++++---
 .../GridServiceProcessorMultiNodeSelfTest.java  | 61 +++++++++++++
 3 files changed, 146 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[2/4] ignite git commit: ignite-4932 WIP

Posted by sb...@apache.org.
ignite-4932 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59c9707c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59c9707c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59c9707c

Branch: refs/heads/ignite-4932
Commit: 59c9707ccef73d2cce5ba7171225be995c247276
Parents: f9f4256
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 16:00:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:00:40 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 152 ++++++++----
 .../processors/cache/GridCacheContext.java      |   9 +
 .../processors/cache/GridCacheEventManager.java |  24 ++
 .../processors/cache/GridCacheMapEntry.java     |  45 ++--
 .../processors/cache/GridCacheSwapManager.java  |  22 ++
 .../dht/GridPartitionedGetFuture.java           | 152 +++++++-----
 .../dht/GridPartitionedSingleGetFuture.java     | 139 +++++++----
 .../dht/atomic/GridDhtAtomicCache.java          | 230 ++++++++++++-------
 .../dht/colocated/GridDhtColocatedCache.java    | 228 ++++++++++--------
 .../local/atomic/GridLocalAtomicCache.java      | 197 ++++++++++------
 .../cache/IgniteCacheNoSyncForGetTest.java      |  77 ++++++-
 11 files changed, 832 insertions(+), 443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 27a5750..5042f77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1908,80 +1908,130 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 Map<KeyCacheObject, EntryGetResult> misses = null;
 
+                boolean offheapRead = ctx.offheapRead(expiry, readerArgs != null);
+
                 for (KeyCacheObject key : keys) {
                     while (true) {
-                        GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
-
-                        if (entry == null) {
-                            if (!skipVals && ctx.config().isStatisticsEnabled())
-                                ctx.cache().metrics0().onRead(false);
-
-                            break;
-                        }
-
                         try {
-                            EntryGetResult res;
+                            EntryGetResult res = null;
 
                             boolean evt = !skipVals;
                             boolean updateMetrics = !skipVals;
 
-                            if (storeEnabled) {
-                                res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
-                                    updateMetrics,
-                                    evt,
-                                    subjId,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary,
-                                    readerArgs);
+                            GridCacheEntryEx entry = null;
+
+                            boolean skipEntry;
+
+                            if (offheapRead) {
+                                GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
 
-                                assert res != null;
+                                if (swapEntry != null) {
+                                    skipEntry = true;
 
-                                if (res.value() == null) {
-                                    if (misses == null)
-                                        misses = new HashMap<>();
+                                    long expireTime = swapEntry.expireTime();
 
-                                    misses.put(key, res);
+                                    if (expireTime != 0) {
+                                        if (expireTime - U.currentTimeMillis() > 0) {
+                                            res = new EntryGetWithTtlResult(swapEntry.value(),
+                                                swapEntry.version(),
+                                                false,
+                                                expireTime,
+                                                swapEntry.ttl());
+                                        }
+                                        else
+                                            skipEntry = false; // Do not skip entry if need process expiration.
+                                    }
+                                    else
+                                        res = new EntryGetResult(swapEntry.value(), swapEntry.version(), false);
+                                }
+                                else
+                                    skipEntry = !storeEnabled;
+
+                                if (skipEntry) {
+                                    if (evt) {
+                                        ctx.events().readEvent(key,
+                                            null,
+                                            swapEntry != null ? swapEntry.value() : null,
+                                            subjId,
+                                            taskName,
+                                            !deserializeBinary);
+                                    }
 
-                                    res = null;
+                                    if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(swapEntry != null);
                                 }
                             }
-                            else {
-                                if (needVer || readerArgs != null) {
-                                    res = entry.innerGetVersioned(
-                                        null,
-                                        null,
-                                        ctx.isSwapOrOffheapEnabled(),
-                                       /*unmarshal*/true,
+                            else
+                                skipEntry = false;
+
+                            if (!skipEntry) {
+                                entry = needEntry ? entryEx(key) : peekEx(key);
+
+                                if (entry == null) {
+                                    if (!skipVals && ctx.config().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(false);
+
+                                    break;
+                                }
+
+                                if (storeEnabled) {
+                                    res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
                                         updateMetrics,
                                         evt,
                                         subjId,
-                                        null,
                                         taskName,
                                         expiry,
                                         !deserializeBinary,
                                         readerArgs);
+
+                                    assert res != null;
+
+                                    if (res.value() == null) {
+                                        if (misses == null)
+                                            misses = new HashMap<>();
+
+                                        misses.put(key, res);
+
+                                        res = null;
+                                    }
                                 }
                                 else {
-                                    CacheObject val = entry.innerGet(
-                                        null,
-                                        null,
-                                        ctx.isSwapOrOffheapEnabled(),
-                                        false,
-                                        updateMetrics,
-                                        evt,
-                                        false,
-                                        subjId,
-                                        null,
-                                        taskName,
-                                        expiry,
-                                        !deserializeBinary);
+                                    if (needVer || readerArgs != null) {
+                                        res = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            ctx.isSwapOrOffheapEnabled(),
+                                            /*unmarshal*/true,
+                                            updateMetrics,
+                                            evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary,
+                                            readerArgs);
+                                    }
+                                    else {
+                                        CacheObject val = entry.innerGet(
+                                            null,
+                                            null,
+                                            ctx.isSwapOrOffheapEnabled(),
+                                            false,
+                                            updateMetrics,
+                                            evt,
+                                            false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary);
+
+                                        res = val != null ? new EntryGetResult(val, null) : null;
+                                    }
 
-                                    res = val != null ? new EntryGetResult(val, null) : null;
+                                    if (res == null)
+                                        ctx.evicts().touch(entry, topVer);
                                 }
-
-                                if (res == null)
-                                    ctx.evicts().touch(entry, topVer);
                             }
 
                             if (res != null) {
@@ -1994,7 +2044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                     true,
                                     needVer);
 
-                                if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+                                if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
                                     ctx.evicts().touch(entry, topVer);
 
                                 if (keysSize == 1)

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b44b50..0985161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2058,6 +2058,15 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @param expiryPlc
+     * @param readers
+     * @return
+     */
+    public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+        return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+    }
+
+    /**
      * @param part Partition.
      * @param affNodes Affinity nodes.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 1c18738..8953b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -61,6 +62,29 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
         cctx.gridEvents().removeLocalEventListener(lsnr);
     }
 
+    public void readEvent(KeyCacheObject key,
+        IgniteInternalTx tx,
+        CacheObject val,
+        UUID subjId,
+        String taskName,
+        boolean keepBinary) {
+        if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+            addEvent(cctx.affinity().partition(key),
+                key,
+                tx,
+                null,
+                EVT_CACHE_OBJECT_READ,
+                val,
+                val != null,
+                val,
+                val != null,
+                subjId,
+                null,
+                taskName,
+                keepBinary);
+        }
+    }
+
     /**
      * @param part Partition.
      * @param key Key for the event.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 34f8b96..b9ebed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,31 +874,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Object res = null;
 
-        // TODO IGNITE-4932: metrics/events.
+        if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+            // Fast heap get without 'synchronized'.
+            CacheObject val0 = this.val;
 
-        if (readerArgs == null && expiryPlc == null) {
-            if (!retVer && cctx.config().isEagerTtl()) { // Fast heap get.
-                CacheObject val0 = this.val;
-
-                if (val0 != null)
-                    return val0;
-            }
+            if (val0 != null) {
+                if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
+                    cctx.cache().metrics0().onRead(true);
 
-            if (cctx.isSwapOrOffheapEnabled() && readSwap) {
-                GridCacheSwapEntry swapEntry = cctx.swap().read(this, false, true, true, false);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                    transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
 
-                if (swapEntry != null) {
-                    long expireTime = swapEntry.expireTime();
+                    GridCacheMvcc mvcc = mvccExtras();
 
-                    if (expireTime != 0) {
-                        if (expireTime - U.currentTimeMillis() > 0) {
-                            return retVer ? new EntryGetWithTtlResult(val, ver, false, expireTime, swapEntry.ttl()) :
-                                swapEntry.value();
-                        }
-                    }
-                    else
-                        return retVer ? new EntryGetResult(val, ver, false) : swapEntry.value();
+                    cctx.events().addEvent(
+                        partition(),
+                        key,
+                        tx,
+                        mvcc != null ? mvcc.anyOwner() : null,
+                        EVT_CACHE_OBJECT_READ,
+                        val0,
+                        true,
+                        val0,
+                        true,
+                        subjId,
+                        transformClo != null ? transformClo.getClass().getName() : null,
+                        taskName,
+                        keepBinary);
                 }
+
+                return val0;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 159b3b8..07edaff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -819,6 +819,28 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key.
+     * @return Read value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public GridCacheSwapEntry readSwapEntry(KeyCacheObject key) throws IgniteCheckedException {
+        assert offheapEnabled || swapEnabled;
+
+        GridCacheSwapEntry entry = read(key,
+            key.valueBytes(cctx.cacheObjectContext()),
+            cctx.affinity().partition(key),
+            false,
+            true,
+            true,
+            false);
+
+        assert entry == null || entry.value() != null : entry;
+        assert entry == null || entry.version() != null : entry;
+
+        return entry;
+    }
+
+    /**
      * @param entry Entry to read.
      * @return Read value address.
      * @throws IgniteCheckedException If read failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 519239a..798e2dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -437,81 +438,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
         GridDhtCacheAdapter<K, V> cache = cache();
 
-        while (true) {
-            GridCacheEntryEx entry;
+        boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+        boolean evt = !skipVals;
 
+        while (true) {
             try {
-                entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+                boolean skipEntry;
 
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
+                EntryGetResult getRes = null;
+                CacheObject v = null;
+                GridCacheVersion ver = null;
 
-                    EntryGetResult getRes = null;
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
+                if (offheapRead) {
+                    skipEntry = true;
 
-                    if (needVer) {
-                        getRes = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /*swap*/true,
-                            /*unmarshal*/true,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            !deserializeBinary,
-                            null);
+                    GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
+
+                    if (swapEntry != null) {
+                        long expireTime = swapEntry.expireTime();
+
+                        if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                            v = swapEntry.value();
 
-                        if (getRes != null) {
-                            v = getRes.value();
-                            ver = getRes.version();
+                            if (needVer)
+                                ver = swapEntry.version();
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
+
+                    if (skipEntry && evt) {
+                        cctx.events().readEvent(key,
                             null,
-                            /*swap*/true,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            /*temporary*/false,
+                            swapEntry != null ? swapEntry.value() : null,
                             subjId,
-                            null,
                             taskName,
-                            expiryPlc,
                             !deserializeBinary);
                     }
+                }
+                else
+                    skipEntry = false;
+
+                if (!skipEntry) {
+                    GridCacheEntryEx entry =
+                        cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            getRes = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary,
+                                null);
+
+                            if (getRes != null) {
+                                v = getRes.value();
+                                ver = getRes.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary);
+                        }
 
-                    cache.context().evicts().touch(entry, topVer);
+                        cache.context().evicts().touch(entry, topVer);
 
-                    // Entry was not in memory or in swap, so we remove it from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            cache.removeEntry(entry);
-                    }
-                    else {
-                        cctx.addResult(locVals,
-                            key,
-                            v,
-                            skipVals,
-                            keepCacheObjects,
-                            deserializeBinary,
-                            true,
-                            getRes,
-                            ver,
-                            0,
-                            0,
-                            needVer);
-
-                        return true;
+                        // Entry was not in memory or in swap, so we remove it from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                cache.removeEntry(entry);
+                        }
                     }
                 }
 
+                if (v != null) {
+                    cctx.addResult(locVals,
+                        key,
+                        v,
+                        skipVals,
+                        keepCacheObjects,
+                        deserializeBinary,
+                        true,
+                        getRes,
+                        ver,
+                        0,
+                        0,
+                        needVer);
+
+                    return true;
+                }
+
                 boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
 
                 // Entry not found, do not continue search if topology did not change and there is no store.

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a3f6b72..11d4fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -360,74 +361,110 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
 
         GridDhtCacheAdapter colocated = cctx.dht();
 
-        while (true) {
-            GridCacheEntryEx entry;
+        boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+        boolean evt = !skipVals;
 
+        while (true) {
             try {
-                entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                    colocated.peekEx(key);
+                CacheObject v = null;
+                GridCacheVersion ver = null;
 
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
+                boolean skipEntry;
 
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
+                if (offheapRead) {
+                    skipEntry = true;
 
-                    if (needVer) {
-                        EntryGetResult res = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /*swap*/true,
-                            /*unmarshal*/true,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            true,
-                            null);
+                    GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
 
-                        if (res != null) {
-                            v = res.value();
-                            ver = res.version();
+                    if (swapEntry != null) {
+                        long expireTime = swapEntry.expireTime();
+
+                        if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                            v = swapEntry.value();
+
+                            if (needVer)
+                                ver = swapEntry.version();
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
+
+                    if (skipEntry && evt) {
+                        cctx.events().readEvent(key,
                             null,
-                            /*swap*/true,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            /*temporary*/false,
+                            swapEntry != null ? swapEntry.value() : null,
                             subjId,
-                            null,
                             taskName,
-                            expiryPlc,
-                            true);
+                            !deserializeBinary);
                     }
+                }
+                else
+                    skipEntry = false;
+
+                if (!skipEntry) {
+                    GridCacheEntryEx entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                        colocated.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            EntryGetResult res = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true,
+                                null);
+
+                            if (res != null) {
+                                v = res.value();
+                                ver = res.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true);
+                        }
 
-                    colocated.context().evicts().touch(entry, topVer);
+                        colocated.context().evicts().touch(entry, topVer);
 
-                    // Entry was not in memory or in swap, so we remove it from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            colocated.removeEntry(entry);
+                        if (v == null) {
+                            // Entry was not in memory or in swap, so we remove it from cache.
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                colocated.removeEntry(entry);
+                        }
                     }
-                    else {
-                        if (!skipVals && cctx.config().isStatisticsEnabled())
-                            cctx.cache().metrics0().onRead(true);
+                }
 
-                        if (!skipVals)
-                            setResult(v, ver);
-                        else
-                            setSkipValueResult(true, ver);
+                if (v != null) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onRead(true);
 
-                        return true;
-                    }
+                    if (!skipVals)
+                        setResult(v, ver);
+                    else
+                        setSkipValueResult(true, ver);
+
+                    return true;
                 }
 
                 boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8523366..c6bceef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -1565,114 +1566,165 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
+        boolean evt = !skipVals;
+
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!forcePrimary && ctx.affinityNode()) {
-            Map<K, V> locVals = U.newHashMap(keys.size());
-
-            boolean success = true;
-
-            // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
+            try {
+                Map<K, V> locVals = U.newHashMap(keys.size());
+
+                boolean success = true;
+                boolean offheapRead = ctx.offheapRead(expiry, false);
+
+                // Optimistically expect that all keys are available locally (avoid creation of get future).
+                for (KeyCacheObject key : keys) {
+                    if (offheapRead) {
+                        GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
+
+                        if (swapEntry != null) {
+                            long expireTime = swapEntry.expireTime();
+
+                            if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                                ctx.addResult(locVals,
+                                    key,
+                                    swapEntry.value(),
+                                    skipVals,
+                                    false,
+                                    deserializeBinary,
                                     true,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(null,
-                                    null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
                                     null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = context().versions().next();
-
-                                if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
+                                    swapEntry.version(),
+                                    0,
+                                    0,
+                                    needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        swapEntry.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
                             else
-                                ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
-                                    getRes, ver, 0, 0, needVer);
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+                                // If our DHT cache do has value, then we peek it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*unmarshal*/true,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            true,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(null,
+                                            null,
+                                            /*swap*/true,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            /*temporary*/false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so we remove it from cache.
+                                    if (v == null) {
+                                        if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            false,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            ctx.evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    ctx.evicts().touch(entry, topVer);
+                            }
+                        }
+
+                        if (!success)
+                            break;
+                        else if (!skipVals && ctx.config().isStatisticsEnabled())
+                            metrics0().onRead(true);
                     }
                 }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    metrics0().onRead(true);
-            }
+                if (success) {
+                    sendTtlUpdateRequest(expiry);
 
-            if (success) {
-                sendTtlUpdateRequest(expiry);
+                    return new GridFinishedFuture<>(locVals);
+                }
 
-                return new GridFinishedFuture<>(locVals);
+                if (expiry != null)
+                    expiry.reset();
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 
-        if (expiry != null)
-            expiry.reset();
-
         // Either reload or not all values are available locally.
         GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
             keys,

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c8556e5..4b1dd9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -451,121 +452,162 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!forcePrimary) {
-            Map<K, V> locVals = null;
+            try {
+                Map<K, V> locVals = null;
 
-            boolean success = true;
+                boolean success = true;
+                boolean offheapRead = ctx.offheapRead(expiryPlc, false);
+                boolean evt = !skipVals;
 
-            // Optimistically expect that all keys are available locally (avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(
-                                    null,
-                                    null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = context().versions().next();
+                // Optimistically expect that all keys are available locally (avoid creation of get future).
+                for (KeyCacheObject key : keys) {
+                    if (offheapRead) {
+                        GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
 
-                                if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
-                            }
-                            else {
-                                if (locVals == null)
-                                    locVals = U.newHashMap(keys.size());
+                        if (swapEntry != null) {
+                            long expireTime = swapEntry.expireTime();
 
+                            if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
                                 ctx.addResult(locVals,
                                     key,
-                                    v,
+                                    swapEntry.value(),
                                     skipVals,
-                                    keepCacheObj,
+                                    false,
                                     deserializeBinary,
                                     true,
-                                    getRes,
-                                    ver,
+                                    null,
+                                    swapEntry.version(),
                                     0,
                                     0,
                                     needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        swapEntry.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
+                            else
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+                                // If our DHT cache do has value, then we peek it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*unmarshal*/true,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(
+                                            null,
+                                            null,
+                                            /*swap*/true,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            /*temporary*/false,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so we remove it from cache.
+                                    if (v == null) {
+                                        GridCacheVersion obsoleteVer = context().versions().next();
+
+                                        if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        if (locVals == null)
+                                            locVals = U.newHashMap(keys.size());
+
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            keepCacheObj,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            context().evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    context().evicts().touch(entry, topVer);
+                            }
+                        }
                     }
-                }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    ctx.cache().metrics0().onRead(true);
-            }
+                    if (!success)
+                        break;
+                    else if (!skipVals && ctx.config().isStatisticsEnabled())
+                        ctx.cache().metrics0().onRead(true);
+                }
 
-            if (success) {
-                sendTtlUpdateRequest(expiryPlc);
+                if (success) {
+                    sendTtlUpdateRequest(expiryPlc);
 
-                return new GridFinishedFuture<>(locVals);
+                    return new GridFinishedFuture<>(locVals);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index f86df2f..1f66fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCachePreloader;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -397,97 +398,149 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
         final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
 
         boolean success = true;
+        final boolean offheapRead = ctx.offheapRead(expiry, false);
+        final boolean evt = !skipVals;
 
         for (K key : keys) {
             if (key == null)
                 throw new NullPointerException("Null key.");
 
-            GridCacheEntryEx entry = null;
-
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            while (true) {
-                try {
-                    entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
+            boolean skipEntry;
 
-                    if (entry != null) {
-                        CacheObject v;
+            if (offheapRead) {
+                skipEntry = true;
 
-                        if (needVer) {
-                            EntryGetResult res = entry.innerGetVersioned(
-                                null,
-                                null,
-                                /*swap*/swapOrOffheap,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiry,
-                                !deserializeBinary,
-                                null);
-
-                            if (res != null) {
-                                ctx.addResult(
-                                    vals,
-                                    cacheKey,
-                                    res,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
-                                    needVer);
-                            }
-                            else
-                                success = false;
-                        }
-                        else {
-                            v = entry.innerGet(
-                                null,
+                GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(cacheKey);
+
+                if (swapEntry != null) {
+                    long expireTime = swapEntry.expireTime();
+
+                    if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+                        ctx.addResult(vals,
+                            cacheKey,
+                            swapEntry.value(),
+                            skipVals,
+                            false,
+                            deserializeBinary,
+                            true,
+                            null,
+                            swapEntry.version(),
+                            0,
+                            0,
+                            needVer);
+
+                        if (configuration().isStatisticsEnabled() && !skipVals)
+                            metrics0().onRead(true);
+
+                        if (evt) {
+                            ctx.events().readEvent(cacheKey,
                                 null,
-                                /*swap*/swapOrOffheap,
-                                /*read-through*/false,
-                                /**update-metrics*/true,
-                                /**event*/!skipVals,
-                                /**temporary*/false,
+                                swapEntry.value(),
                                 subjId,
-                                null,
                                 taskName,
-                                expiry,
                                 !deserializeBinary);
+                        }
+                    }
+                    else
+                        skipEntry = false;
+                }
+                else
+                    success = false;
+
+                if (skipEntry && !success && !storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+                    metrics0().onRead(false);
+            }
+            else
+                skipEntry = false;
+
+            if (!skipEntry) {
+                GridCacheEntryEx entry = null;
+
+                CacheObject v;
+
+                while (true) {
+                    try {
+                        entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
 
-                            if (v != null) {
-                                ctx.addResult(vals,
-                                    cacheKey,
-                                    v,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
+                        if (entry != null) {
+                            if (needVer) {
+                                EntryGetResult res = entry.innerGetVersioned(
+                                    null,
+                                    null,
+                                    /*swap*/swapOrOffheap,
+                                    /*unmarshal*/true,
+                                    /*update-metrics*/false,
+                                    /*event*/!skipVals,
+                                    subjId,
                                     null,
-                                    0,
-                                    0);
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary,
+                                    null);
+
+                                if (res != null) {
+                                    ctx.addResult(
+                                        vals,
+                                        cacheKey,
+                                        res,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        needVer);
+                                }
+                                else
+                                    success = false;
+                            }
+                            else {
+                                v = entry.innerGet(
+                                    null,
+                                    null,
+                                    /*swap*/swapOrOffheap,
+                                    /*read-through*/false,
+                                    /*update-metrics*/true,
+                                    /*event*/!skipVals,
+                                    /*temporary*/false,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary);
+
+                                if (v != null) {
+                                    ctx.addResult(vals,
+                                        cacheKey,
+                                        v,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        null,
+                                        0,
+                                        0);
+                                }
+                                else
+                                    success = false;
                             }
-                            else
-                                success = false;
                         }
-                    }
-                    else {
-                        if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
-                            metrics0().onRead(false);
+                        else {
+                            if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+                                metrics0().onRead(false);
 
-                        success = false;
-                    }
+                            success = false;
+                        }
 
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, retry.
-                }
-                finally {
-                    if (entry != null)
-                        ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        // No-op, retry.
+                    }
+                    finally {
+                        if (entry != null)
+                            ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                    }
                 }
 
                 if (!success && storeEnabled)

http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index faa63b3..0dbfc7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +30,6 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -93,37 +95,51 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAtomicGet() throws Exception {
-        doGet(ATOMIC, ONHEAP_TIERED);
+        doGet(ATOMIC, ONHEAP_TIERED, false);
+
+        doGet(ATOMIC, ONHEAP_TIERED, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicGetOffheap() throws Exception {
-        doGet(ATOMIC, OFFHEAP_TIERED);
+        doGet(ATOMIC, OFFHEAP_TIERED, false);
+
+        doGet(ATOMIC, OFFHEAP_TIERED, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void doGet(CacheAtomicityMode atomicityMode, CacheMemoryMode memoryMode) throws Exception {
+    private void doGet(CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        final boolean getAll) throws Exception {
         Ignite srv = ignite(0);
 
         Ignite client = ignite(1);
 
         final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, memoryMode));
 
+        final Map<Object, Object> data = new HashMap<>();
+
+        data.put(1, 1);
+        data.put(2, 2);
+
         try {
             // Get from compute closure.
             {
-                cache.put(1, 1);
+                cache.putAll(data);
 
                 hangLatch = new CountDownLatch(1);
                 processorStartLatch = new CountDownLatch(1);
 
                 IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
-                        cache.invoke(1, new HangEntryProcessor());
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
 
                         return null;
                     }
@@ -134,7 +150,14 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
                     assertTrue(wait);
 
-                    assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName())));
+                    if (getAll) {
+                        assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+                            new GetAllClosure(data.keySet(), cache.getName())));
+                    }
+                    else {
+                        assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+                            new GetClosure(1, cache.getName())));
+                    }
 
                     hangLatch.countDown();
 
@@ -147,14 +170,17 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
             // Local get.
             {
-                cache.put(1, 1);
+                cache.putAll(data);
 
                 hangLatch = new CountDownLatch(1);
                 processorStartLatch = new CountDownLatch(1);
 
                 IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
                     @Override public Void call() throws Exception {
-                        cache.invoke(1, new HangEntryProcessor());
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
 
                         return null;
                     }
@@ -165,7 +191,10 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
 
                     assertTrue(wait);
 
-                    assertEquals(1, srv.cache(cache.getName()).get(1));
+                    if (getAll)
+                        assertEquals(data, srv.cache(cache.getName()).getAll(data.keySet()));
+                    else
+                        assertEquals(1, srv.cache(cache.getName()).get(1));
 
                     hangLatch.countDown();
 
@@ -249,4 +278,32 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
             return ignite.cache(cacheName).get(key);
         }
     }
+
+    /**
+     *
+     */
+    public static class GetAllClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final Set<Object> keys;
+
+        /** */
+        private final String cacheName;
+
+        /**
+         * @param keys Keys.
+         */
+        public GetAllClosure(Set<Object> keys, String cacheName) {
+            this.keys = keys;
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return ignite.cache(cacheName).getAll(keys);
+        }
+    }
 }


[4/4] ignite git commit: ignite-4932 WIP

Posted by sb...@apache.org.
ignite-4932 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2d9ea0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2d9ea0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2d9ea0c

Branch: refs/heads/ignite-4932
Commit: f2d9ea0cc2a1b6fd66d00bc386f36c53757fe28c
Parents: 7d1d975
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 17:23:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:44:46 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java        | 18 +++++++++++++++++-
 .../processors/cache/GridCacheContext.java        |  5 ++++-
 .../processors/cache/GridCacheEventManager.java   | 16 ++++++++++++----
 .../processors/cache/GridCacheMapEntry.java       |  2 +-
 .../processors/cache/IgniteCacheExpiryPolicy.java |  5 +++++
 .../cache/IgniteCacheNoSyncForGetTest.java        | 11 ++++++-----
 .../ignite/testsuites/IgniteCacheTestSuite2.java  |  7 +++++--
 7 files changed, 50 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5042f77..c70541a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5838,8 +5838,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return null;
 
             return new CacheExpiryPolicy() {
+                private Long accessTtl;
+
                 @Override public long forAccess() {
-                    return CU.toTtl(expiryPlc.getExpiryForAccess());
+                    if (accessTtl == null)
+                        accessTtl = CU.toTtl(expiryPlc.getExpiryForAccess());
+
+                    return accessTtl;
+                }
+
+                @Override public boolean hasAccessTtl() {
+                    if (accessTtl == null)
+                        accessTtl = CU.toTtl(expiryPlc.getExpiryForAccess());
+
+                    return accessTtl != CU.TTL_NOT_CHANGED;
                 }
 
                 @Override public long forCreate() {
@@ -5870,6 +5882,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     return accessTtl;
                 }
 
+                @Override public boolean hasAccessTtl() {
+                    return accessTtl != CU.TTL_NOT_CHANGED;
+                }
+
                 /** {@inheritDoc} */
                 @Override public long forUpdate() {
                     return CU.TTL_NOT_CHANGED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 0985161..b4668a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2063,7 +2063,10 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return
      */
     public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
-        return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+        return offheapTiered() &&
+            isSwapOrOffheapEnabled() &&
+            (expiryPlc == null || !expiryPlc.hasAccessTtl()) &&
+            !readers;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 8953b63..7a417d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -62,11 +62,19 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
         cctx.gridEvents().removeLocalEventListener(lsnr);
     }
 
+    /**
+     * @param key Key for event.
+     * @param tx Possible surrounding transaction.
+     * @param val Read value.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param keepBinary Keep binary flag.
+     */
     public void readEvent(KeyCacheObject key,
-        IgniteInternalTx tx,
-        CacheObject val,
-        UUID subjId,
-        String taskName,
+        @Nullable IgniteInternalTx tx,
+        @Nullable CacheObject val,
+        @Nullable UUID subjId,
+        @Nullable String taskName,
         boolean keepBinary) {
         if (isRecordable(EVT_CACHE_OBJECT_READ)) {
             addEvent(cctx.affinity().partition(key),

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b9ebed3..00e5199 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,7 +874,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Object res = null;
 
-        if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+        if (readerArgs == null && (expiryPlc == null || !expiryPlc.hasAccessTtl()) && !retVer && cctx.config().isEagerTtl()) {
             // Fast heap get without 'synchronized'.
             CacheObject val0 = this.val;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
index f82c5f0..1d3e056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -46,6 +46,11 @@ public interface IgniteCacheExpiryPolicy {
     public long forAccess();
 
     /**
+     * @return {@code True}
+     */
+    public boolean hasAccessTtl();
+
+    /**
      * Callback for ttl update on entry access.
      *
      * @param key Entry key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index 0dbfc7e..85212c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -42,9 +42,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
@@ -268,7 +269,7 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
         /**
          * @param key Key.
          */
-        public GetClosure(int key, String cacheName) {
+        GetClosure(int key, String cacheName) {
             this.key = key;
             this.cacheName = cacheName;
         }
@@ -296,7 +297,7 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
         /**
          * @param keys Keys.
          */
-        public GetAllClosure(Set<Object> keys, String cacheName) {
+        GetAllClosure(Set<Object> keys, String cacheName) {
             this.keys = keys;
             this.cacheName = cacheName;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index e37a8a1..6d0745d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
 import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.OffheapCacheOnClientsTest;
@@ -114,11 +115,11 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxSingleThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheStoreUpdateTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedJobExecutionTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest;
@@ -278,6 +279,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class));
         suite.addTest(new TestSuite(GridNearOffheapCacheStoreUpdateTest.class));
 
+        suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
         return suite;
     }
 }