You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/12 14:44:57 UTC
[2/4] ignite git commit: ignite-4932 WIP
ignite-4932 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59c9707c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59c9707c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59c9707c
Branch: refs/heads/ignite-4932
Commit: 59c9707ccef73d2cce5ba7171225be995c247276
Parents: f9f4256
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 12 16:00:12 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 12 17:00:40 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 152 ++++++++----
.../processors/cache/GridCacheContext.java | 9 +
.../processors/cache/GridCacheEventManager.java | 24 ++
.../processors/cache/GridCacheMapEntry.java | 45 ++--
.../processors/cache/GridCacheSwapManager.java | 22 ++
.../dht/GridPartitionedGetFuture.java | 152 +++++++-----
.../dht/GridPartitionedSingleGetFuture.java | 139 +++++++----
.../dht/atomic/GridDhtAtomicCache.java | 230 ++++++++++++-------
.../dht/colocated/GridDhtColocatedCache.java | 228 ++++++++++--------
.../local/atomic/GridLocalAtomicCache.java | 197 ++++++++++------
.../cache/IgniteCacheNoSyncForGetTest.java | 77 ++++++-
11 files changed, 832 insertions(+), 443 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 27a5750..5042f77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1908,80 +1908,130 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<KeyCacheObject, EntryGetResult> misses = null;
+ boolean offheapRead = ctx.offheapRead(expiry, readerArgs != null);
+
for (KeyCacheObject key : keys) {
while (true) {
- GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
-
- if (entry == null) {
- if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(false);
-
- break;
- }
-
try {
- EntryGetResult res;
+ EntryGetResult res = null;
boolean evt = !skipVals;
boolean updateMetrics = !skipVals;
- if (storeEnabled) {
- res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
- updateMetrics,
- evt,
- subjId,
- taskName,
- expiry,
- !deserializeBinary,
- readerArgs);
+ GridCacheEntryEx entry = null;
+
+ boolean skipEntry;
+
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
- assert res != null;
+ if (swapEntry != null) {
+ skipEntry = true;
- if (res.value() == null) {
- if (misses == null)
- misses = new HashMap<>();
+ long expireTime = swapEntry.expireTime();
- misses.put(key, res);
+ if (expireTime != 0) {
+ if (expireTime - U.currentTimeMillis() > 0) {
+ res = new EntryGetWithTtlResult(swapEntry.value(),
+ swapEntry.version(),
+ false,
+ expireTime,
+ swapEntry.ttl());
+ }
+ else
+ skipEntry = false; // Do not skip entry if need process expiration.
+ }
+ else
+ res = new EntryGetResult(swapEntry.value(), swapEntry.version(), false);
+ }
+ else
+ skipEntry = !storeEnabled;
+
+ if (skipEntry) {
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry != null ? swapEntry.value() : null,
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
- res = null;
+ if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(swapEntry != null);
}
}
- else {
- if (needVer || readerArgs != null) {
- res = entry.innerGetVersioned(
- null,
- null,
- ctx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ entry = needEntry ? entryEx(key) : peekEx(key);
+
+ if (entry == null) {
+ if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(false);
+
+ break;
+ }
+
+ if (storeEnabled) {
+ res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
updateMetrics,
evt,
subjId,
- null,
taskName,
expiry,
!deserializeBinary,
readerArgs);
+
+ assert res != null;
+
+ if (res.value() == null) {
+ if (misses == null)
+ misses = new HashMap<>();
+
+ misses.put(key, res);
+
+ res = null;
+ }
}
else {
- CacheObject val = entry.innerGet(
- null,
- null,
- ctx.isSwapOrOffheapEnabled(),
- false,
- updateMetrics,
- evt,
- false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
+ if (needVer || readerArgs != null) {
+ res = entry.innerGetVersioned(
+ null,
+ null,
+ ctx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary,
+ readerArgs);
+ }
+ else {
+ CacheObject val = entry.innerGet(
+ null,
+ null,
+ ctx.isSwapOrOffheapEnabled(),
+ false,
+ updateMetrics,
+ evt,
+ false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ res = val != null ? new EntryGetResult(val, null) : null;
+ }
- res = val != null ? new EntryGetResult(val, null) : null;
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
}
-
- if (res == null)
- ctx.evicts().touch(entry, topVer);
}
if (res != null) {
@@ -1994,7 +2044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
needVer);
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
+ if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
ctx.evicts().touch(entry, topVer);
if (keysSize == 1)
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 3b44b50..0985161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2058,6 +2058,15 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @param expiryPlc
+ * @param readers
+ * @return
+ */
+ public boolean offheapRead(IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
+ return offheapTiered() && isSwapOrOffheapEnabled() && expiryPlc == null && !readers;
+ }
+
+ /**
* @param part Partition.
* @param affNodes Affinity nodes.
* @param topVer Topology version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 1c18738..8953b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -61,6 +62,29 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
cctx.gridEvents().removeLocalEventListener(lsnr);
}
+ public void readEvent(KeyCacheObject key,
+ IgniteInternalTx tx,
+ CacheObject val,
+ UUID subjId,
+ String taskName,
+ boolean keepBinary) {
+ if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+ addEvent(cctx.affinity().partition(key),
+ key,
+ tx,
+ null,
+ EVT_CACHE_OBJECT_READ,
+ val,
+ val != null,
+ val,
+ val != null,
+ subjId,
+ null,
+ taskName,
+ keepBinary);
+ }
+ }
+
/**
* @param part Partition.
* @param key Key for the event.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 34f8b96..b9ebed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -874,31 +874,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object res = null;
- // TODO IGNITE-4932: metrics/events.
+ if (readerArgs == null && expiryPlc == null && !retVer && cctx.config().isEagerTtl()) {
+ // Fast heap get without 'synchronized'.
+ CacheObject val0 = this.val;
- if (readerArgs == null && expiryPlc == null) {
- if (!retVer && cctx.config().isEagerTtl()) { // Fast heap get.
- CacheObject val0 = this.val;
-
- if (val0 != null)
- return val0;
- }
+ if (val0 != null) {
+ if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- if (cctx.isSwapOrOffheapEnabled() && readSwap) {
- GridCacheSwapEntry swapEntry = cctx.swap().read(this, false, true, true, false);
+ if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+ transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
- if (swapEntry != null) {
- long expireTime = swapEntry.expireTime();
+ GridCacheMvcc mvcc = mvccExtras();
- if (expireTime != 0) {
- if (expireTime - U.currentTimeMillis() > 0) {
- return retVer ? new EntryGetWithTtlResult(val, ver, false, expireTime, swapEntry.ttl()) :
- swapEntry.value();
- }
- }
- else
- return retVer ? new EntryGetResult(val, ver, false) : swapEntry.value();
+ cctx.events().addEvent(
+ partition(),
+ key,
+ tx,
+ mvcc != null ? mvcc.anyOwner() : null,
+ EVT_CACHE_OBJECT_READ,
+ val0,
+ true,
+ val0,
+ true,
+ subjId,
+ transformClo != null ? transformClo.getClass().getName() : null,
+ taskName,
+ keepBinary);
}
+
+ return val0;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 159b3b8..07edaff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -819,6 +819,28 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key.
+ * @return Read value.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridCacheSwapEntry readSwapEntry(KeyCacheObject key) throws IgniteCheckedException {
+ assert offheapEnabled || swapEnabled;
+
+ GridCacheSwapEntry entry = read(key,
+ key.valueBytes(cctx.cacheObjectContext()),
+ cctx.affinity().partition(key),
+ false,
+ true,
+ true,
+ false);
+
+ assert entry == null || entry.value() != null : entry;
+ assert entry == null || entry.version() != null : entry;
+
+ return entry;
+ }
+
+ /**
* @param entry Entry to read.
* @return Read value address.
* @throws IgniteCheckedException If read failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 519239a..798e2dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -437,81 +438,118 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
GridDhtCacheAdapter<K, V> cache = cache();
- while (true) {
- GridCacheEntryEx entry;
+ boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
+ while (true) {
try {
- entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+ boolean skipEntry;
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
+ GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ v = swapEntry.value();
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
+ if (needVer)
+ ver = swapEntry.version();
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
+
+ if (skipEntry && evt) {
+ cctx.events().readEvent(key,
null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
+ swapEntry != null ? swapEntry.value() : null,
subjId,
- null,
taskName,
- expiryPlc,
!deserializeBinary);
}
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry =
+ cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
- cache.context().evicts().touch(entry, topVer);
+ cache.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- cache.removeEntry(entry);
- }
- else {
- cctx.addResult(locVals,
- key,
- v,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- true,
- getRes,
- ver,
- 0,
- 0,
- needVer);
-
- return true;
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ cache.removeEntry(entry);
+ }
}
}
+ if (v != null) {
+ cctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+
+ return true;
+ }
+
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
// Entry not found, do not continue search if topology did not change and there is no store.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a3f6b72..11d4fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -360,74 +361,110 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
GridDhtCacheAdapter colocated = cctx.dht();
- while (true) {
- GridCacheEntryEx entry;
+ boolean offheapRead = cctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
+ while (true) {
try {
- entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
- colocated.peekEx(key);
+ CacheObject v = null;
+ GridCacheVersion ver = null;
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
+ boolean skipEntry;
- CacheObject v = null;
- GridCacheVersion ver = null;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- true,
- null);
+ GridCacheSwapEntry swapEntry = cctx.swap().readSwapEntry(key);
- if (res != null) {
- v = res.value();
- ver = res.version();
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ v = swapEntry.value();
+
+ if (needVer)
+ ver = swapEntry.version();
}
+ else
+ skipEntry = false;
}
- else {
- v = entry.innerGet(
- null,
+
+ if (skipEntry && evt) {
+ cctx.events().readEvent(key,
null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
+ swapEntry != null ? swapEntry.value() : null,
subjId,
- null,
taskName,
- expiryPlc,
- true);
+ !deserializeBinary);
}
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+ colocated.peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true,
+ null);
+
+ if (res != null) {
+ v = res.value();
+ ver = res.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ true);
+ }
- colocated.context().evicts().touch(entry, topVer);
+ colocated.context().evicts().touch(entry, topVer);
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- if (isNew && entry.markObsoleteIfEmpty(ver))
- colocated.removeEntry(entry);
+ if (v == null) {
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (isNew && entry.markObsoleteIfEmpty(ver))
+ colocated.removeEntry(entry);
+ }
}
- else {
- if (!skipVals && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onRead(true);
+ }
- if (!skipVals)
- setResult(v, ver);
- else
- setSkipValueResult(true, ver);
+ if (v != null) {
+ if (!skipVals && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onRead(true);
- return true;
- }
+ if (!skipVals)
+ setResult(v, ver);
+ else
+ setSkipValueResult(true, ver);
+
+ return true;
}
boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8523366..c6bceef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -1565,114 +1566,165 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+ boolean evt = !skipVals;
+
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary && ctx.affinityNode()) {
- Map<K, V> locVals = U.newHashMap(keys.size());
-
- boolean success = true;
-
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
+ try {
+ Map<K, V> locVals = U.newHashMap(keys.size());
+
+ boolean success = true;
+ boolean offheapRead = ctx.offheapRead(expiry, false);
+
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ ctx.addResult(locVals,
+ key,
+ swapEntry.value(),
+ skipVals,
+ false,
+ deserializeBinary,
true,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(null,
- null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
null,
- taskName,
- expiry,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
-
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
+ swapEntry.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
else
- ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true,
- getRes, ver, 0, 0, needVer);
+ success = false;
}
else
success = false;
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ true,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ if (isNew && entry.markObsoleteIfEmpty(context().versions().next()))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, topVer);
+ }
+ }
+
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ metrics0().onRead(true);
}
}
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- metrics0().onRead(true);
- }
+ if (success) {
+ sendTtlUpdateRequest(expiry);
- if (success) {
- sendTtlUpdateRequest(expiry);
+ return new GridFinishedFuture<>(locVals);
+ }
- return new GridFinishedFuture<>(locVals);
+ if (expiry != null)
+ expiry.reset();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
- if (expiry != null)
- expiry.reset();
-
// Either reload or not all values are available locally.
GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
keys,
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c8556e5..4b1dd9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -451,121 +452,162 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
// Optimisation: try to resolve value locally and escape 'get future' creation.
if (!forcePrimary) {
- Map<K, V> locVals = null;
+ try {
+ Map<K, V> locVals = null;
- boolean success = true;
+ boolean success = true;
+ boolean offheapRead = ctx.offheapRead(expiryPlc, false);
+ boolean evt = !skipVals;
- // Optimistically expect that all keys are available locally (avoid creation of get future).
- for (KeyCacheObject key : keys) {
- GridCacheEntryEx entry = null;
-
- while (true) {
- try {
- entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
-
- // If our DHT cache do has value, then we peek it.
- if (entry != null) {
- boolean isNew = entry.isNewLocked();
-
- EntryGetResult getRes = null;
- CacheObject v = null;
- GridCacheVersion ver = null;
-
- if (needVer) {
- getRes = entry.innerGetVersioned(
- null,
- null,
- /*swap*/true,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary,
- null);
-
- if (getRes != null) {
- v = getRes.value();
- ver = getRes.version();
- }
- }
- else {
- v = entry.innerGet(
- null,
- null,
- /*swap*/true,
- /*read-through*/false,
- /**update-metrics*/false,
- /*event*/!skipVals,
- /*temporary*/false,
- subjId,
- null,
- taskName,
- expiryPlc,
- !deserializeBinary);
- }
-
- // Entry was not in memory or in swap, so we remove it from cache.
- if (v == null) {
- GridCacheVersion obsoleteVer = context().versions().next();
+ // Optimistically expect that all keys are available locally (avoid creation of get future).
+ for (KeyCacheObject key : keys) {
+ if (offheapRead) {
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(key);
- if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
- removeEntry(entry);
-
- success = false;
- }
- else {
- if (locVals == null)
- locVals = U.newHashMap(keys.size());
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
ctx.addResult(locVals,
key,
- v,
+ swapEntry.value(),
skipVals,
- keepCacheObj,
+ false,
deserializeBinary,
true,
- getRes,
- ver,
+ null,
+ swapEntry.version(),
0,
0,
needVer);
+
+ if (evt) {
+ ctx.events().readEvent(key,
+ null,
+ swapEntry.value(),
+ subjId,
+ taskName,
+ !deserializeBinary);
+ }
}
+ else
+ success = false;
}
else
success = false;
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- catch (GridDhtInvalidPartitionException ignored) {
- success = false;
+ else {
+ GridCacheEntryEx entry = null;
+
+ while (true) {
+ try {
+ entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key);
+
+ // If our DHT cache do has value, then we peek it.
+ if (entry != null) {
+ boolean isNew = entry.isNewLocked();
+
+ EntryGetResult getRes = null;
+ CacheObject v = null;
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ getRes = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/evt,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary,
+ null);
+
+ if (getRes != null) {
+ v = getRes.value();
+ ver = getRes.version();
+ }
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/true,
+ /*read-through*/false,
+ /*update-metrics*/false,
+ /*event*/evt,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiryPlc,
+ !deserializeBinary);
+ }
+
+ // Entry was not in memory or in swap, so we remove it from cache.
+ if (v == null) {
+ GridCacheVersion obsoleteVer = context().versions().next();
+
+ if (isNew && entry.markObsoleteIfEmpty(obsoleteVer))
+ removeEntry(entry);
+
+ success = false;
+ }
+ else {
+ if (locVals == null)
+ locVals = U.newHashMap(keys.size());
+
+ ctx.addResult(locVals,
+ key,
+ v,
+ skipVals,
+ keepCacheObj,
+ deserializeBinary,
+ true,
+ getRes,
+ ver,
+ 0,
+ 0,
+ needVer);
+ }
+ }
+ else
+ success = false;
- break; // While.
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- finally {
- if (entry != null)
- context().evicts().touch(entry, topVer);
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ catch (GridDhtInvalidPartitionException ignored) {
+ success = false;
+
+ break; // While.
+ }
+ finally {
+ if (entry != null)
+ context().evicts().touch(entry, topVer);
+ }
+ }
}
- }
- if (!success)
- break;
- else if (!skipVals && ctx.config().isStatisticsEnabled())
- ctx.cache().metrics0().onRead(true);
- }
+ if (!success)
+ break;
+ else if (!skipVals && ctx.config().isStatisticsEnabled())
+ ctx.cache().metrics0().onRead(true);
+ }
- if (success) {
- sendTtlUpdateRequest(expiryPlc);
+ if (success) {
+ sendTtlUpdateRequest(expiryPlc);
- return new GridFinishedFuture<>(locVals);
+ return new GridFinishedFuture<>(locVals);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index f86df2f..1f66fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCachePreloader;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
@@ -397,97 +398,149 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null);
boolean success = true;
+ final boolean offheapRead = ctx.offheapRead(expiry, false);
+ final boolean evt = !skipVals;
for (K key : keys) {
if (key == null)
throw new NullPointerException("Null key.");
- GridCacheEntryEx entry = null;
-
KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
- while (true) {
- try {
- entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
+ boolean skipEntry;
- if (entry != null) {
- CacheObject v;
+ if (offheapRead) {
+ skipEntry = true;
- if (needVer) {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- null,
- /*swap*/swapOrOffheap,
- /*unmarshal*/true,
- /**update-metrics*/false,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary,
- null);
-
- if (res != null) {
- ctx.addResult(
- vals,
- cacheKey,
- res,
- skipVals,
- false,
- deserializeBinary,
- true,
- needVer);
- }
- else
- success = false;
- }
- else {
- v = entry.innerGet(
- null,
+ GridCacheSwapEntry swapEntry = ctx.swap().readSwapEntry(cacheKey);
+
+ if (swapEntry != null) {
+ long expireTime = swapEntry.expireTime();
+
+ if (expireTime == 0 || expireTime < U.currentTimeMillis()) {
+ ctx.addResult(vals,
+ cacheKey,
+ swapEntry.value(),
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ swapEntry.version(),
+ 0,
+ 0,
+ needVer);
+
+ if (configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(true);
+
+ if (evt) {
+ ctx.events().readEvent(cacheKey,
null,
- /*swap*/swapOrOffheap,
- /*read-through*/false,
- /**update-metrics*/true,
- /**event*/!skipVals,
- /**temporary*/false,
+ swapEntry.value(),
subjId,
- null,
taskName,
- expiry,
!deserializeBinary);
+ }
+ }
+ else
+ skipEntry = false;
+ }
+ else
+ success = false;
+
+ if (skipEntry && !success && !storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
+ }
+ else
+ skipEntry = false;
+
+ if (!skipEntry) {
+ GridCacheEntryEx entry = null;
+
+ CacheObject v;
+
+ while (true) {
+ try {
+ entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
- if (v != null) {
- ctx.addResult(vals,
- cacheKey,
- v,
- skipVals,
- false,
- deserializeBinary,
- true,
+ if (entry != null) {
+ if (needVer) {
+ EntryGetResult res = entry.innerGetVersioned(
+ null,
+ null,
+ /*swap*/swapOrOffheap,
+ /*unmarshal*/true,
+ /*update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
null,
- 0,
- 0);
+ taskName,
+ expiry,
+ !deserializeBinary,
+ null);
+
+ if (res != null) {
+ ctx.addResult(
+ vals,
+ cacheKey,
+ res,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ needVer);
+ }
+ else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(
+ null,
+ null,
+ /*swap*/swapOrOffheap,
+ /*read-through*/false,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ null,
+ 0,
+ 0);
+ }
+ else
+ success = false;
}
- else
- success = false;
}
- }
- else {
- if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
- metrics0().onRead(false);
+ else {
+ if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
+ metrics0().onRead(false);
- success = false;
- }
+ success = false;
+ }
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignored) {
- // No-op, retry.
- }
- finally {
- if (entry != null)
- ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // No-op, retry.
+ }
+ finally {
+ if (entry != null)
+ ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+ }
}
if (!success && storeEnabled)
http://git-wip-us.apache.org/repos/asf/ignite/blob/59c9707c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
index faa63b3..0dbfc7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -27,7 +30,6 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -93,37 +95,51 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAtomicGet() throws Exception {
- doGet(ATOMIC, ONHEAP_TIERED);
+ doGet(ATOMIC, ONHEAP_TIERED, false);
+
+ doGet(ATOMIC, ONHEAP_TIERED, true);
}
/**
* @throws Exception If failed.
*/
public void testAtomicGetOffheap() throws Exception {
- doGet(ATOMIC, OFFHEAP_TIERED);
+ doGet(ATOMIC, OFFHEAP_TIERED, false);
+
+ doGet(ATOMIC, OFFHEAP_TIERED, true);
}
/**
* @throws Exception If failed.
*/
- private void doGet(CacheAtomicityMode atomicityMode, CacheMemoryMode memoryMode) throws Exception {
+ private void doGet(CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ final boolean getAll) throws Exception {
Ignite srv = ignite(0);
Ignite client = ignite(1);
final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, memoryMode));
+ final Map<Object, Object> data = new HashMap<>();
+
+ data.put(1, 1);
+ data.put(2, 2);
+
try {
// Get from compute closure.
{
- cache.put(1, 1);
+ cache.putAll(data);
hangLatch = new CountDownLatch(1);
processorStartLatch = new CountDownLatch(1);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- cache.invoke(1, new HangEntryProcessor());
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
return null;
}
@@ -134,7 +150,14 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
assertTrue(wait);
- assertEquals(1, client.compute().affinityCall(cache.getName(), 1, new GetClosure(1, cache.getName())));
+ if (getAll) {
+ assertEquals(data, client.compute().affinityCall(cache.getName(), 1,
+ new GetAllClosure(data.keySet(), cache.getName())));
+ }
+ else {
+ assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+ new GetClosure(1, cache.getName())));
+ }
hangLatch.countDown();
@@ -147,14 +170,17 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
// Local get.
{
- cache.put(1, 1);
+ cache.putAll(data);
hangLatch = new CountDownLatch(1);
processorStartLatch = new CountDownLatch(1);
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
- cache.invoke(1, new HangEntryProcessor());
+ if (getAll)
+ cache.invokeAll(data.keySet(), new HangEntryProcessor());
+ else
+ cache.invoke(1, new HangEntryProcessor());
return null;
}
@@ -165,7 +191,10 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
assertTrue(wait);
- assertEquals(1, srv.cache(cache.getName()).get(1));
+ if (getAll)
+ assertEquals(data, srv.cache(cache.getName()).getAll(data.keySet()));
+ else
+ assertEquals(1, srv.cache(cache.getName()).get(1));
hangLatch.countDown();
@@ -249,4 +278,32 @@ public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
return ignite.cache(cacheName).get(key);
}
}
+
+ /**
+ *
+ */
+ public static class GetAllClosure implements IgniteCallable<Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final Set<Object> keys;
+
+ /** */
+ private final String cacheName;
+
+ /**
+ * @param keys Keys.
+ */
+ public GetAllClosure(Set<Object> keys, String cacheName) {
+ this.keys = keys;
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ return ignite.cache(cacheName).getAll(keys);
+ }
+ }
}