You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/12 14:44:56 UTC
[1/4] ignite git commit: IGNITE-4907: Fixed excessive service
instances can be started with dynamic deployment. This closes #1766.
Repository: ignite
Updated Branches:
refs/heads/ignite-4932 f9f4256ae -> f2d9ea0cc
IGNITE-4907: Fixed excessive service instances can be started with dynamic deployment. This closes #1766.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f7ef742
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f7ef742
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f7ef742
Branch: refs/heads/ignite-4932
Commit: 0f7ef74216fab64f5d1d2b6d432b552b7fe40d2f
Parents: 01ceeb1
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Wed Apr 12 13:01:25 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Wed Apr 12 13:01:25 2017 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 2 +-
...ServiceProcessorMultiNodeConfigSelfTest.java | 95 +++++++++++++++++---
.../GridServiceProcessorMultiNodeSelfTest.java | 61 +++++++++++++
3 files changed, 146 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d0b2733..37bffc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -972,7 +972,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
int perNodeCnt = totalCnt != 0 ? totalCnt / size : maxPerNodeCnt;
int remainder = totalCnt != 0 ? totalCnt % size : 0;
- if (perNodeCnt > maxPerNodeCnt && maxPerNodeCnt != 0) {
+ if (perNodeCnt >= maxPerNodeCnt && maxPerNodeCnt != 0) {
perNodeCnt = maxPerNodeCnt;
remainder = 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
index 1bd3b03..9da62c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeConfigSelfTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.processors.service;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
-import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
@@ -38,6 +40,9 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
/** Node singleton name. */
private static final String NODE_SINGLE_BUT_CLIENT = "serviceConfigEachNodeButClient";
+ /** Node singleton name. */
+ private static final String NODE_SINGLE_WITH_LIMIT = "serviceConfigWithLimit";
+
/** Affinity service name. */
private static final String AFFINITY = "serviceConfigAffinity";
@@ -51,7 +56,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
/** {@inheritDoc} */
@Override protected ServiceConfiguration[] services() {
- ServiceConfiguration[] arr = new ServiceConfiguration[4];
+ List<ServiceConfiguration> cfgs = new ArrayList<>();
ServiceConfiguration cfg = new ServiceConfiguration();
@@ -60,7 +65,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
cfg.setTotalCount(1);
cfg.setService(new DummyService());
- arr[0] = cfg;
+ cfgs.add(cfg);
cfg = new ServiceConfiguration();
@@ -68,7 +73,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
cfg.setMaxPerNodeCount(1);
cfg.setService(new DummyService());
- arr[1] = cfg;
+ cfgs.add(cfg);
cfg = new ServiceConfiguration();
@@ -79,7 +84,7 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
cfg.setTotalCount(1);
cfg.setService(new AffinityService(AFFINITY_KEY));
- arr[2] = cfg;
+ cfgs.add(cfg);
cfg = new ServiceConfiguration();
@@ -88,9 +93,18 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
cfg.setNodeFilter(new CacheConfiguration.IgniteAllNodesPredicate());
cfg.setService(new DummyService());
- arr[3] = cfg;
+ cfgs.add(cfg);
+
+ cfg = new ServiceConfiguration();
- return arr;
+ cfg.setName(NODE_SINGLE_WITH_LIMIT);
+ cfg.setMaxPerNodeCount(1);
+ cfg.setTotalCount(nodeCount() + 1);
+ cfg.setService(new DummyService());
+
+ cfgs.add(cfg);
+
+ return cfgs.toArray(new ServiceConfiguration[cfgs.size()]);
}
/** {@inheritDoc} */
@@ -107,6 +121,8 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
DummyService.cancelled(NODE_SINGLE) == 0 &&
DummyService.started(NODE_SINGLE_BUT_CLIENT) == nodeCount() &&
DummyService.cancelled(NODE_SINGLE_BUT_CLIENT) == 0 &&
+ DummyService.started(NODE_SINGLE_WITH_LIMIT) >= nodeCount() &&
+ DummyService.cancelled(NODE_SINGLE_WITH_LIMIT) == 0 &&
actualCount(AFFINITY, randomGrid().services().serviceDescriptors()) == 1;
}
},
@@ -170,6 +186,59 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
finally {
stopExtraNodes(nodeCnt);
}
+
+ checkCount(AFFINITY, g.services().serviceDescriptors(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployLimits() throws Exception {
+ final Ignite g = randomGrid();
+
+ final String name = NODE_SINGLE_WITH_LIMIT;
+
+ waitForDeployment(name, nodeCount());
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
+
+ int extraNodes = 2;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ startExtraNodes(extraNodes);
+
+ try {
+ latch.await();
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount() + 1);
+ }
+ finally {
+ stopExtraNodes(extraNodes);
+ }
+
+ assertEquals(name, 1, DummyService.cancelled(name));
+
+ waitForDeployment(name, nodeCount());
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
+ }
+
+ /**
+ * @param srvcName Service name
+ * @param expectedDeps Expected number of service deployments
+ *
+ */
+ private boolean waitForDeployment(final String srvcName, final int expectedDeps) throws IgniteInterruptedCheckedException {
+ final Ignite g = randomGrid();
+
+ return GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() {
+ return actualCount(srvcName, g.services().serviceDescriptors()) == expectedDeps;
+ }
+ }, 1500);
}
/**
@@ -212,10 +281,6 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
try {
latch.await();
- // Ensure service is deployed.
- assertNotNull(grid(nodeCount() + newNodes - 1).services()
- .serviceProxy(NODE_SINGLE_BUT_CLIENT, Service.class, false, 2000));
-
assertEquals(name, newNodes, DummyService.started(name));
assertEquals(name, 0, DummyService.cancelled(name));
@@ -224,6 +289,10 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
finally {
stopExtraNodes(newNodes);
}
+
+ waitForDeployment(name, nodeCount());
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
}
/**
@@ -253,5 +322,9 @@ public class GridServiceProcessorMultiNodeConfigSelfTest extends GridServiceProc
finally {
stopExtraNodes(servers + clients);
}
+
+ waitForDeployment(name, nodeCount());
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0f7ef742/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
index f7403dc..d133cf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java
@@ -264,4 +264,65 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA
stopGrid("client");
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDeployLimits() throws Exception {
+ String name = "serviceWithLimitsUpdateTopology";
+
+ Ignite g = randomGrid();
+
+ final int totalInstances = nodeCount() + 1;
+
+ CountDownLatch latch = new CountDownLatch(nodeCount());
+
+ DummyService.exeLatch(name, latch);
+
+ ServiceConfiguration srvcCfg = new ServiceConfiguration();
+
+ srvcCfg.setName(name);
+ srvcCfg.setMaxPerNodeCount(1);
+ srvcCfg.setTotalCount(totalInstances);
+ srvcCfg.setService(new DummyService());
+
+ IgniteServices svcs = g.services().withAsync();
+
+ svcs.deploy(srvcCfg);
+
+ IgniteFuture<?> fut = svcs.future();
+
+ info("Deployed service: " + name);
+
+ fut.get();
+
+ info("Finished waiting for service future: " + name);
+
+ latch.await();
+
+ TestCase.assertEquals(name, nodeCount(), DummyService.started(name));
+ TestCase.assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), nodeCount());
+
+ int extraNodes = 2;
+
+ latch = new CountDownLatch(1);
+
+ DummyService.exeLatch(name, latch);
+
+ startExtraNodes(2);
+
+ try {
+ latch.await();
+
+ TestCase.assertEquals(name, totalInstances, DummyService.started(name));
+ TestCase.assertEquals(name, 0, DummyService.cancelled(name));
+
+ checkCount(name, g.services().serviceDescriptors(), totalInstances);
+ }
+ finally {
+ stopExtraNodes(extraNodes);
+ }
+ }
}
\ No newline at end of file
[3/4] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-1.9.2' into ignite-4932
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/community/ignite-1.9.2' into ignite-4932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d1d9752
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d1d9752
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d1d9752
Branch: refs/heads/ignite-4932
Commit: 7d1d9752a5415a87c93526b99b3f3ed12860cc12
Parents: 59c9707 0f7ef74
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 17:11:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:11:14 2017 +0300
----------------------------------------------------------------------
.../service/GridServiceProcessor.java | 2 +-
...ServiceProcessorMultiNodeConfigSelfTest.java | 95 +++++++++++++++++---
.../GridServiceProcessorMultiNodeSelfTest.java | 61 +++++++++++++
3 files changed, 146 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[2/4] ignite git commit: ignite-4932 WIP
Posted by sb...@apache.org.
ignite-4932 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59c9707c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59c9707c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59c9707c
Branch: refs/heads/ignite-4932
Commit: 59c9707ccef73d2cce5ba7171225be995c247276
Parents: f9f4256
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 16:00:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:00:40 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 152 ++++++++----
.../processors/cache/GridCacheContext.java | 9 +
.../processors/cache/GridCacheEventManager.java | 24 ++
.../processors/cache/GridCacheMapEntry.java | 45 ++--
.../processors/cache/GridCacheSwapManager.java | 22 ++
.../dht/GridPartitionedGetFuture.java | 152 +++++++-----
.../dht/GridPartitionedSingleGetFuture.java | 139 +++++++----
.../dht/atomic/GridDhtAtomicCache.java | 230 ++++++++++++-------
.../dht/colocated/GridDhtColocatedCache.java | 228 ++++++++++--------
.../local/atomic/GridLocalAtomicCache.java | 197 ++++++++++------
.../cache/IgniteCacheNoSyncForGetTest.java | 77 ++++++-
11 files changed, 832 insertions(+), 443 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 27a5750..5042f77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1908,80 +1908,130 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<KeyCacheObject, EntryGetResult> misses = null;
+ boolean offheapRead = ctx.offheapRead(expiry, readerArgs != null);
+
for (KeyCacheObject key : keys) {
while (true) {
- GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
-
- if (entry == null) {
- if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(false);
-
- break;
- }
-
try {
- EntryGetResult res;
+ EntryGetResult res = null;
boolean evt = !skipVals;
boolean updateMetrics = !skipVals;
- if (storeEnabled) {
- res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
- updateMetrics,
- evt,
- subjId,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
+ GridCacheEntryEx entry = null;
+
+ boolean skipEntry;
+
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
- assert res != null;
+ if (swapEntry != null) {
+ skipEntry = true;
- if (res.value() == null) {
- if (misses == null)
- misses = new HashMap<>();
+ long expireTime = swapEntry.expireTime();
- misses.put(key, res);
+ if (expireTime != 0) {
+ if (expireTime - U.currentTimeMillis() > 0) {
+ res = new EntryGetWithTtlResult(swapEntry.value(),
+ swapEntry.version(),
+ false,
+ expireTime,
+ swapEntry.ttl());
+ }
+ else
+ skipEntry = false; // Do not skip entry if need process expiration.
+ }
+ else
+ res = new EntryGetResult(swapEntry.value(), swapEntry.version(), false);
+ }
+ else
+ skipEntry = !storeEnabled;
+
+ if (skipEntry) {
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry != null ? swapEntry.value() : null,
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
- res = null;
+ if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(swapEntry != null);
}
}
- else {
- if (needVer || readerArgs != null) {
- res = entry.innerGetVersioned(
- null,
- null,
- ctx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ entry = needEntry ? entryEx(key) : peekEx(key);
+
+ if (entry == null) {
+ if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(false);
+
+ break;
+ }
+
+ if (storeEnabled) {
+ res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
updateMetrics,
evt,
subjId,
- null,
taskName,
expiry,
!deserializeBinary,
readerArgs);
+
+ assert res != null;
+
+ if (res.value() == null) {
+ if (misses == null)
+ misses = new HashMap<>();
+
+ misses.put(key, res);
+
+ res = null;
+ }
}
else {
- CacheObject val = entry.innerGet(
- null,
- null,
- ctx.isSwapOrOffheapEnabled(),
- false,
- updateMetrics,
- evt,
- false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
+ if (needVer || readerArgs != null) {
+ res = entry.innerGetVersioned(
+ null,
+ null,
+ ctx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+ }
+ else {
+ CacheObject val = entry.innerGet(
+ null,
+ null,
+ ctx.isSwapOrOffheapEnabled(),
+ false,
+ updateMetrics,
+ evt,
+ false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ res = val != null ? new EntryGetResult(val, null) : null;
+ }
- res = val != null ? new EntryGetResult(val, null) : null;
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
}
-
- if (res == null)
- ctx.evicts().touch(entry, topVer);
}
if (res != null) {
@@ -1994,7 +2044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
needVer);
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+ if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
ctx.evicts().touch(entry, topVer);
if (keysSize == 1)
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b44b50..0985161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2058,6 +2058,15 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @param expiryPlc
+ * @param readers
+ * @return
+ */
+ public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+ return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+ }
+
+ /**
* @param part Partition.
* @param affNodes Affinity nodes.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 1c18738..8953b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -61,6 +62,29 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
cctx.gridEvents().removeLocalEventListener(lsnr);
}
+ public void readEvent(KeyCacheObject key,
+ IgniteInternalTx tx,
+ CacheObject val,
+ UUID subjId,
+ String taskName,
+ boolean keepBinary) {
+ if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+ addEvent(cctx.affinity().partition(key),
+ key,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ val,
+ val != null,
+ val,
+ val != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
+ }
+ }
+
/**
* @param part Partition.
* @param key Key for the event.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 34f8b96..b9ebed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,31 +874,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object res = null;
- // TODO IGNITE-4932: metrics/events.
+ if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+ // Fast heap get without 'synchronized'.
+ CacheObject val0 = this.val;
- if (readerArgs == null && expiryPlc == null) {
- if (!retVer && cctx.config().isEagerTtl()) { // Fast heap get.
- CacheObject val0 = this.val;
-
- if (val0 != null)
- return val0;
- }
+ if (val0 != null) {
+ if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- if (cctx.isSwapOrOffheapEnabled() && readSwap) {
- GridCacheSwapEntry swapEntry = cctx.swap().read(this, false, true, true, false);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
- if (swapEntry != null) {
- long expireTime = swapEntry.expireTime();
+ GridCacheMvcc mvcc = mvccExtras();
- if (expireTime != 0) {
- if (expireTime - U.currentTimeMillis() > 0) {
- return retVer ? new EntryGetWithTtlResult(val, ver, false, expireTime, swapEntry.ttl()) :
- swapEntry.value();
- }
- }
- else
- return retVer ? new EntryGetResult(val, ver, false) : swapEntry.value();
+ cctx.events().addEvent(
+ partition(),
+ key,
+ tx,
+ mvcc != null ? mvcc.anyOwner() : null,
+ EVT_CACHE_OBJECT_READ,
+ val0,
+ true,
+ val0,
+ true,
+ subjId,
+ transformClo != null ? transformClo.getClass().getName() : null,
+ taskName,
+ keepBinary);
}
+
+ return val0;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 159b3b8..07edaff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -819,6 +819,28 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key.
+ * @return Read value.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridCacheSwapEntry readSwapEntry(KeyCacheObject key) throws IgniteCheckedException {
+ assert offheapEnabled || swapEnabled;
+
+ GridCacheSwapEntry entry = read(key,
+ key.valueBytes(cctx.cacheObjectContext()),
+ cctx.affinity().partition(key),
+ false,
+ true,
+ true,
+ false);
+
+ assert entry == null || entry.value() != null : entry;
+ assert entry == null || entry.version() != null : entry;
+
+ return entry;
+ }
+
+ /**
* @param entry Entry to read.
* @return Read value address.
* @throws IgniteCheckedException If read failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 519239a..798e2dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -437,81 +438,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
GridDhtCacheAdapter<K, V> cache = cache();
- while (true) {
- GridCacheEntryEx entry;
+ boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
+ while (true) {
try {
- entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+ boolean skipEntry;
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
+ GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ v = swapEntry.value();
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
+ if (needVer)
+ ver = swapEntry.version();
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
+
+ if (skipEntry && evt) {
+ cctx.events().readEvent(key,
null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
+ swapEntry != null ? swapEntry.value() : null,
subjId,
- null,
taskName,
- expiryPlc,
!deserializeBinary);
}
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry =
+ cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
- cache.context().evicts().touch(entry, topVer);
+ cache.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- cache.removeEntry(entry);
- }
- else {
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true,
- getRes,
- ver,
- 0,
- 0,
- needVer);
-
- return true;
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeEntry(entry);
+ }
}
}
+ if (v != null) {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+
+ return true;
+ }
+
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
// Entry not found, do not continue search if topology did not change and there is no store.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a3f6b72..11d4fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -360,74 +361,110 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
GridDhtCacheAdapter colocated = cctx.dht();
- while (true) {
- GridCacheEntryEx entry;
+ boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
+ while (true) {
try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
+ boolean skipEntry;
- CacheObject v = null;
- GridCacheVersion ver = null;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true,
- null);
+ GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
- if (res != null) {
- v = res.value();
- ver = res.version();
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ v = swapEntry.value();
+
+ if (needVer)
+ ver = swapEntry.version();
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
+
+ if (skipEntry && evt) {
+ cctx.events().readEvent(key,
null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
+ swapEntry != null ? swapEntry.value() : null,
subjId,
- null,
taskName,
- expiryPlc,
- true);
+ !deserializeBinary);
}
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+ colocated.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true,
+ null);
+
+ if (res != null) {
+ v = res.value();
+ ver = res.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+ }
- colocated.context().evicts().touch(entry, topVer);
+ colocated.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeEntry(entry);
+ if (v == null) {
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeEntry(entry);
+ }
}
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ }
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ if (v != null) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- return true;
- }
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return true;
}
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8523366..c6bceef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -1565,114 +1566,165 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+ boolean evt = !skipVals;
+
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary && ctx.affinityNode()) {
- Map<K, V> locVals = U.newHashMap(keys.size());
-
- boolean success = true;
-
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
+ try {
+ Map<K, V> locVals = U.newHashMap(keys.size());
+
+ boolean success = true;
+ boolean offheapRead = ctx.offheapRead(expiry, false);
+
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ ctx.addResult(locVals,
+ key,
+ swapEntry.value(),
+ skipVals,
+ false,
+ deserializeBinary,
true,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(null,
- null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
null,
- taskName,
- expiry,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
-
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
+ swapEntry.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
else
- ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
- getRes, ver, 0, 0, needVer);
+ success = false;
}
else
success = false;
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, topVer);
+ }
+ }
+
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ metrics0().onRead(true);
}
}
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- metrics0().onRead(true);
- }
+ if (success) {
+ sendTtlUpdateRequest(expiry);
- if (success) {
- sendTtlUpdateRequest(expiry);
+ return new GridFinishedFuture<>(locVals);
+ }
- return new GridFinishedFuture<>(locVals);
+ if (expiry != null)
+ expiry.reset();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
- if (expiry != null)
- expiry.reset();
-
// Either reload or not all values are available locally.
GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
keys,
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c8556e5..4b1dd9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -451,121 +452,162 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary) {
- Map<K, V> locVals = null;
+ try {
+ Map<K, V> locVals = null;
- boolean success = true;
+ boolean success = true;
+ boolean offheapRead = ctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
- null,
- null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
- }
- else {
- if (locVals == null)
- locVals = U.newHashMap(keys.size());
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
ctx.addResult(locVals,
key,
- v,
+ swapEntry.value(),
skipVals,
- keepCacheObj,
+ false,
deserializeBinary,
true,
- getRes,
- ver,
+ null,
+ swapEntry.version(),
0,
0,
needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ success = false;
}
else
success = false;
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ GridCacheVersion obsoleteVer = context().versions().next();
+
+ if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ if (locVals == null)
+ locVals = U.newHashMap(keys.size());
+
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- context().evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ context().evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiryPlc);
+ if (success) {
+ sendTtlUpdateRequest(expiryPlc);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index f86df2f..1f66fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -397,97 +398,149 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
boolean success = true;
+ final boolean offheapRead = ctx.offheapRead(expiry, false);
+ final boolean evt = !skipVals;
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
- GridCacheEntryEx entry = null;
-
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
- while (true) {
- try {
- entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
+ boolean skipEntry;
- if (entry != null) {
- CacheObject v;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /*swap*/swapOrOffheap,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- null);
-
- if (res != null) {
- ctx.addResult(
- vals,
- cacheKey,
- res,
- skipVals,
- false,
- deserializeBinary,
- true,
- needVer);
- }
- else
- success = false;
- }
- else {
- v = entry.innerGet(
- null,
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(cacheKey);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ ctx.addResult(vals,
+ cacheKey,
+ swapEntry.value(),
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ swapEntry.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(true);
+
+ if (evt) {
+ ctx.events().readEvent(cacheKey,
null,
- /*swap*/swapOrOffheap,
- /*read-through*/false,
- /**update-metrics*/true,
- /**event*/!skipVals,
- /**temporary*/false,
+ swapEntry.value(),
subjId,
- null,
taskName,
- expiry,
!deserializeBinary);
+ }
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ success = false;
+
+ if (skipEntry && !success && !storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry = null;
+
+ CacheObject v;
+
+ while (true) {
+ try {
+ entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
- if (v != null) {
- ctx.addResult(vals,
- cacheKey,
- v,
- skipVals,
- false,
- deserializeBinary,
- true,
+ if (entry != null) {
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/swapOrOffheap,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
null,
- 0,
- 0);
+ taskName,
+ expiry,
+ !deserializeBinary,
+ null);
+
+ if (res != null) {
+ ctx.addResult(
+ vals,
+ cacheKey,
+ res,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ needVer);
+ }
+ else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/swapOrOffheap,
+ /*read-through*/false,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ 0,
+ 0);
+ }
+ else
+ success = false;
}
- else
- success = false;
}
- }
- else {
- if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
- metrics0().onRead(false);
+ else {
+ if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
- success = false;
- }
+ success = false;
+ }
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ }
}
if (!success && storeEnabled)
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index faa63b3..0dbfc7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -27,7 +30,6 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -93,37 +95,51 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAtomicGet() throws Exception {
- doGet(ATOMIC, ONHEAP_TIERED);
+ doGet(ATOMIC, ONHEAP_TIERED, false);
+
+ doGet(ATOMIC, ONHEAP_TIERED, true);
}
/**
* @throws Exception If failed.
*/
public void testAtomicGetOffheap() throws Exception {
- doGet(ATOMIC, OFFHEAP_TIERED);
+ doGet(ATOMIC, OFFHEAP_TIERED, false);
+
+ doGet(ATOMIC, OFFHEAP_TIERED, true);
}
/**
* @throws Exception If failed.
*/
- private void doGet(CacheAtomicityMode atomicityMode, CacheMemoryMode memoryMode) throws Exception {
+ private void doGet(CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ final boolean getAll) throws Exception {
Ignite srv = ignite(0);
Ignite client = ignite(1);
final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, memoryMode));
+ final Map<Object, Object> data = new HashMap<>();
+
+ data.put(1, 1);
+ data.put(2, 2);
+
try {
// Get from compute closure.
{
- cache.put(1, 1);
+ cache.putAll(data);
hangLatch = new CountDownLatch(1);
processorStartLatch = new CountDownLatch(1);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- cache.invoke(1, new HangEntryProcessor());
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
return null;
}
@@ -134,7 +150,14 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
assertTrue(wait);
- assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName())));
+ if (getAll) {
+ assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+ new GetAllClosure(data.keySet(), cache.getName())));
+ }
+ else {
+ assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+ new GetClosure(1, cache.getName())));
+ }
hangLatch.countDown();
@@ -147,14 +170,17 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
// Local get.
{
- cache.put(1, 1);
+ cache.putAll(data);
hangLatch = new CountDownLatch(1);
processorStartLatch = new CountDownLatch(1);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- cache.invoke(1, new HangEntryProcessor());
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
return null;
}
@@ -165,7 +191,10 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
assertTrue(wait);
- assertEquals(1, srv.cache(cache.getName()).get(1));
+ if (getAll)
+ assertEquals(data, srv.cache(cache.getName()).getAll(data.keySet()));
+ else
+ assertEquals(1, srv.cache(cache.getName()).get(1));
hangLatch.countDown();
@@ -249,4 +278,32 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
return ignite.cache(cacheName).get(key);
}
}
+
+ /**
+ *
+ */
+ public static class GetAllClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final Set<Object> keys;
+
+ /** */
+ private final String cacheName;
+
+ /**
+ * @param keys Keys.
+ */
+ public GetAllClosure(Set<Object> keys, String cacheName) {
+ this.keys = keys;
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return ignite.cache(cacheName).getAll(keys);
+ }
+ }
}
[4/4] ignite git commit: ignite-4932 WIP
Posted by sb...@apache.org.
ignite-4932 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2d9ea0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2d9ea0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2d9ea0c
Branch: refs/heads/ignite-4932
Commit: f2d9ea0cc2a1b6fd66d00bc386f36c53757fe28c
Parents: 7d1d975
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 17:23:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:44:46 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 18 +++++++++++++++++-
.../processors/cache/GridCacheContext.java | 5 ++++-
.../processors/cache/GridCacheEventManager.java | 16 ++++++++++++----
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../processors/cache/IgniteCacheExpiryPolicy.java | 5 +++++
.../cache/IgniteCacheNoSyncForGetTest.java | 11 ++++++-----
.../ignite/testsuites/IgniteCacheTestSuite2.java | 7 +++++--
7 files changed, 50 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5042f77..c70541a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5838,8 +5838,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return null;
return new CacheExpiryPolicy() {
+ private Long accessTtl;
+
@Override public long forAccess() {
- return CU.toTtl(expiryPlc.getExpiryForAccess());
+ if (accessTtl == null)
+ accessTtl = CU.toTtl(expiryPlc.getExpiryForAccess());
+
+ return accessTtl;
+ }
+
+ @Override public boolean hasAccessTtl() {
+ if (accessTtl == null)
+ accessTtl = CU.toTtl(expiryPlc.getExpiryForAccess());
+
+ return accessTtl != CU.TTL_NOT_CHANGED;
}
@Override public long forCreate() {
@@ -5870,6 +5882,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return accessTtl;
}
+ @Override public boolean hasAccessTtl() {
+ return accessTtl != CU.TTL_NOT_CHANGED;
+ }
+
/** {@inheritDoc} */
@Override public long forUpdate() {
return CU.TTL_NOT_CHANGED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 0985161..b4668a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2063,7 +2063,10 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return
*/
public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
- return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+ return offheapTiered() &&
+ isSwapOrOffheapEnabled() &&
+ (expiryPlc == null || !expiryPlc.hasAccessTtl()) &&
+ !readers;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 8953b63..7a417d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -62,11 +62,19 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
cctx.gridEvents().removeLocalEventListener(lsnr);
}
+ /**
+ * @param key Key for event.
+ * @param tx Possible surrounding transaction.
+ * @param val Read value.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
+ * @param keepBinary Keep binary flag.
+ */
public void readEvent(KeyCacheObject key,
- IgniteInternalTx tx,
- CacheObject val,
- UUID subjId,
- String taskName,
+ @Nullable IgniteInternalTx tx,
+ @Nullable CacheObject val,
+ @Nullable UUID subjId,
+ @Nullable String taskName,
boolean keepBinary) {
if (isRecordable(EVT_CACHE_OBJECT_READ)) {
addEvent(cctx.affinity().partition(key),
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b9ebed3..00e5199 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,7 +874,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object res = null;
- if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+ if (readerArgs == null && (expiryPlc == null || !expiryPlc.hasAccessTtl()) && !retVer && cctx.config().isEagerTtl()) {
// Fast heap get without 'synchronized'.
CacheObject val0 = this.val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
index f82c5f0..1d3e056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -46,6 +46,11 @@ public interface IgniteCacheExpiryPolicy {
public long forAccess();
/**
+ * @return {@code True}
+ */
+ public boolean hasAccessTtl();
+
+ /**
* Callback for ttl update on entry access.
*
* @param key Entry key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index 0dbfc7e..85212c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -42,9 +42,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
@@ -268,7 +269,7 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
/**
* @param key Key.
*/
- public GetClosure(int key, String cacheName) {
+ GetClosure(int key, String cacheName) {
this.key = key;
this.cacheName = cacheName;
}
@@ -296,7 +297,7 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
/**
* @param keys Keys.
*/
- public GetAllClosure(Set<Object> keys, String cacheName) {
+ GetAllClosure(Set<Object> keys, String cacheName) {
this.keys = keys;
this.cacheName = cacheName;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2d9ea0c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index e37a8a1..6d0745d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
import org.apache.ignite.internal.processors.cache.OffheapCacheOnClientsTest;
@@ -114,11 +115,11 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxSingleThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheStoreUpdateTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedJobExecutionTest;
import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest;
@@ -278,6 +279,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class));
suite.addTest(new TestSuite(GridNearOffheapCacheStoreUpdateTest.class));
+ suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
return suite;
}
}