You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/20 12:52:44 UTC
[04/50] ignite git commit: ignite-114 Load value from store for cache
'invoke'
ignite-114 Load value from store for cache 'invoke'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e10ffef0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e10ffef0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e10ffef0
Branch: refs/heads/ignite-3341
Commit: e10ffef00e91f88932c6478741eb00abe8992de4
Parents: 2b64e7c
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jun 14 10:05:20 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jun 14 10:05:20 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 34 +-
.../processors/cache/GridCacheEntryEx.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 100 +++--
.../processors/cache/GridCacheUtils.java | 3 +
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLockFuture.java | 18 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 19 +-
.../dht/GridPartitionedGetFuture.java | 2 -
.../dht/GridPartitionedSingleGetFuture.java | 2 -
.../dht/atomic/GridDhtAtomicCache.java | 8 -
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 3 +-
.../distributed/near/GridNearGetFuture.java | 4 -
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 8 -
.../cache/transactions/IgniteTxAdapter.java | 2 -
.../cache/transactions/IgniteTxEntry.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 46 +++
.../transactions/IgniteTxLocalAdapter.java | 34 +-
.../datastreamer/DataStreamerImpl.java | 3 +-
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 2 +-
...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
.../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
.../IgniteCacheReadThroughStoreCallTest.java | 288 ++++++++++++++
.../IgniteCacheLoaderWriterAbstractTest.java | 10 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
30 files changed, 1110 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6ccb506..6e647c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3655,8 +3655,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key, false);
try {
- entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
- replicate ? DR_LOAD : DR_NONE);
+ entry.initialValue(cacheVal,
+ ver,
+ ttl,
+ CU.EXPIRE_TIME_CALCULATE,
+ false,
+ topVer,
+ replicate ? DR_LOAD : DR_NONE,
+ true);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
@@ -5102,19 +5108,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throws IgniteCheckedException, GridCacheEntryRemovedException
{
CacheObject val = entry.innerGet(
- null,
- null,
- false,
- false,
- false,
- true,
- false,
- false,
- false,
- null,
- null,
- null,
- null,
+ /*ver*/null,
+ /*tx*/null,
+ /*swap*/false,
+ /*readThrough*/false,
+ /*metrics*/false,
+ /*evt*/false,
+ /*tmp*/false,
+ /*subjId*/null,
+ /*transformClo*/null,
+ /*taskName*/null,
+ /*expiryPlc*/null,
!deserializeBinary);
if (val == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 31bd887..c5fdde6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -278,9 +278,6 @@ public interface GridCacheEntryEx {
* @param tx Ongoing transaction (possibly null).
* @param readSwap Flag indicating whether to check swap memory.
* @param readThrough Flag indicating whether to read through.
- * @param failFast If {@code true}, then throw {@link GridCacheFilterFailedException} if
- * filter didn't pass.
- * @param unmarshal Unmarshal flag.
* @param updateMetrics If {@code true} then metrics should be updated.
* @param evt Flag to signal event notification.
* @param tmp If {@code true} can return temporary instance which is valid while entry lock is held,
@@ -299,8 +296,6 @@ public interface GridCacheEntryEx {
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -677,6 +672,7 @@ public interface GridCacheEntryEx {
* @param preload Flag indicating whether entry is being preloaded.
* @param topVer Topology version.
* @param drType DR type.
+ * @param fromStore {@code True} if value was loaded from store.
* @return {@code True} if initial value was set.
* @throws IgniteCheckedException In case of error.
* @throws GridCacheEntryRemovedException If entry was removed.
@@ -687,7 +683,8 @@ public interface GridCacheEntryEx {
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException;
+ GridDrType drType,
+ boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* Sets new value if current version is <tt>0</tt> using swap entry data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f442202..b5359f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -750,8 +750,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -766,7 +764,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
readSwap,
readThrough,
evt,
- unmarshal,
updateMetrics,
tmp,
subjId,
@@ -796,7 +793,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
readSwap,
false,
evt,
- unmarshal,
updateMetrics,
false,
subjId,
@@ -815,7 +811,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean readSwap,
boolean readThrough,
boolean evt,
- boolean unmarshal,
boolean updateMetrics,
boolean tmp,
UUID subjId,
@@ -1985,6 +1980,51 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (isStartVersion())
unswap(retval, false);
+ // Prepare old value.
+ oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
+
+ // Possibly read value from store.
+ boolean readFromStore = false;
+
+ Object old0 = null;
+
+ if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+ old0 = readThrough(null, key, false, subjId, taskName);
+
+ oldVal = cctx.toCacheObject(old0);
+
+ readFromStore = true;
+
+ // Detach value before index update.
+ oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
+
+ // Calculate initial TTL and expire time.
+ long initTtl;
+ long initExpireTime;
+
+ if (expiryPlc != null && oldVal != null) {
+ IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+ initTtl = initTtlAndExpireTime.get1();
+ initExpireTime = initTtlAndExpireTime.get2();
+ }
+ else {
+ initTtl = CU.TTL_ETERNAL;
+ initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+ }
+
+ if (oldVal != null)
+ updateIndex(oldVal, initExpireTime, ver, null);
+ else
+ clearIndex(null);
+
+ update(oldVal, initExpireTime, initTtl, ver, true);
+
+ if (deletedUnlocked() && oldVal != null && !isInternal())
+ deletedUnlocked(false);
+ }
+
Object transformClo = null;
// Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
@@ -2192,51 +2232,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
"Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
}
- // Prepare old value and value bytes.
- oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
-
- // Possibly read value from store.
- boolean readFromStore = false;
-
- Object old0 = null;
-
- if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
- (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- old0 = readThrough(null, key, false, subjId, taskName);
-
- oldVal = cctx.toCacheObject(old0);
-
- readFromStore = true;
-
- // Detach value before index update.
- oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
- // Calculate initial TTL and expire time.
- long initTtl;
- long initExpireTime;
-
- if (expiryPlc != null && oldVal != null) {
- IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
- initTtl = initTtlAndExpireTime.get1();
- initExpireTime = initTtlAndExpireTime.get2();
- }
- else {
- initTtl = CU.TTL_ETERNAL;
- initExpireTime = CU.EXPIRE_TIME_ETERNAL;
- }
-
- if (oldVal != null)
- updateIndex(oldVal, initExpireTime, ver, null);
- else
- clearIndex(null);
-
- update(oldVal, initExpireTime, initTtl, ver, true);
-
- if (deletedUnlocked() && oldVal != null && !isInternal())
- deletedUnlocked(false);
- }
-
// Apply metrics.
if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
// PutIfAbsent methods mustn't update hit/miss statistics
@@ -3435,7 +3430,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType)
+ GridDrType drType,
+ boolean fromStore)
throws IgniteCheckedException, GridCacheEntryRemovedException {
synchronized (this) {
checkObsolete();
@@ -3488,7 +3484,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.dataStructures().onEntryUpdated(key, false, true);
}
- if (cctx.store().isLocal()) {
+ if (!fromStore && cctx.store().isLocal()) {
if (val != null)
cctx.store().put(null, key, val, ver);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6ce5735..87c4a3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -155,6 +155,9 @@ public class GridCacheUtils {
/** Keep serialized flag. */
public static final int KEEP_BINARY_FLAG_MASK = 0x2;
+ /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+ public static final int OLD_VAL_ON_PRIMARY = 0x4;
+
/** Empty predicate array. */
private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5cc079d..ed1996a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -564,8 +564,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
entry = entryEx(key, false);
- entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
- replicate ? DR_LOAD : DR_NONE);
+ entry.initialValue(cacheVal,
+ ver,
+ ttl,
+ CU.EXPIRE_TIME_CALCULATE,
+ false,
+ topVer,
+ replicate ? DR_LOAD : DR_NONE,
+ false);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to put cache value: " + entry, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 5659436..0a3513a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1053,7 +1053,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
try {
CacheObject val0 = cctx.toCacheObject(val);
- entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD);
+ entry0.initialValue(val0,
+ ver,
+ 0,
+ 0,
+ false,
+ topVer,
+ GridDrType.DR_LOAD,
+ true);
}
catch (GridCacheEntryRemovedException e) {
assert false : "Should not get removed exception while holding lock on entry " +
@@ -1210,8 +1217,13 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
try {
GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
- if (entry.initialValue(info.value(), info.version(), info.ttl(),
- info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
+ if (entry.initialValue(info.value(),
+ info.version(),
+ info.ttl(),
+ info.expireTime(),
+ true, topVer,
+ replicate ? DR_PRELOAD : DR_NONE,
+ false)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
(IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b2da39c..0ca02c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1181,8 +1181,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
tx,
/*swap*/true,
/*read-through*/false,
- /*fail-fast.*/false,
- /*unmarshal*/false,
/*update-metrics*/true,
/*event notification*/req.returnValue(i),
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e17cbbc..cedfee5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -333,9 +333,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
cached.unswap(retVal);
- boolean readThrough = (retVal || hasFilters) &&
- cacheCtx.config().isLoadPreviousValue() &&
- !txEntry.skipStore();
+ boolean readThrough = !txEntry.skipStore() &&
+ (txEntry.op() == TRANSFORM || ((retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue()));
boolean evt = retVal || txEntry.op() == TRANSFORM;
@@ -351,8 +350,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx,
/*swap*/true,
readThrough,
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/retVal,
/*event*/evt,
/*tmp*/false,
@@ -376,6 +373,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean modified = false;
+ txEntry.oldValueOnPrimary(val != null);
+
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
txEntry.cached().version(), keepBinary, txEntry.cached());
@@ -1559,8 +1558,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
try {
- if (entry.initialValue(info.value(), info.version(),
- info.ttl(), info.expireTime(), true, topVer, drType)) {
+ if (entry.initialValue(info.value(),
+ info.version(),
+ info.ttl(),
+ info.expireTime(),
+ true,
+ topVer,
+ drType,
+ false)) {
if (rec && !entry.isInternal())
cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
(IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7d2c5db..b3a485c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -473,8 +473,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b0b9d7c..7788b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -397,8 +397,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 08f1f30..ad87add 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1219,8 +1219,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
@@ -1695,8 +1693,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/true,
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
@@ -1855,8 +1851,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
@@ -1902,8 +1896,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
/*read swap*/true,
/*read through*/ctx.loadPreviousValue(),
- /*fail fast*/false,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/true,
/*temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 35831a3..e8e8298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -507,8 +507,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
null,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/**update-metrics*/false,
/*event*/!skipVals,
/*temporary*/false,
@@ -1057,7 +1055,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx,
keys,
- tx.implicit(),
+ retval,
txRead,
accessTtl,
skipStore,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 4da1f38..7cbd77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -552,7 +552,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
info.expireTime(),
true,
topVer,
- replicate ? DR_PRELOAD : DR_NONE
+ replicate ? DR_PRELOAD : DR_NONE,
+ false
)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6181d9e..6fde827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -689,7 +689,8 @@ public class GridDhtPartitionDemander {
entry.expireTime(),
true,
topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+ false
)) {
cctx.evicts().touch(cached, topVer); // Start tracking.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index edac92c..73b9d38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -457,8 +457,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
tx,
/*swap*/false,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*events*/!skipVals,
/*temporary*/false,
@@ -599,8 +597,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
tx,
/*swap*/true,
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*update-metrics*/false,
/*events*/!nearRead && !skipVals,
/*temporary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 3ff165b..bea2ce5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1138,16 +1138,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @param cacheCtx Cache context.
* @param keys Keys.
- * @param implicit Implicit flag.
+ * @param retval Return value flag.
* @param read Read flag.
* @param accessTtl Access ttl.
* @param <K> Key type.
* @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
* @return Future with respond.
*/
public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext cacheCtx,
final Collection<? extends K> keys,
- boolean implicit,
+ boolean retval,
boolean read,
long accessTtl,
boolean skipStore,
@@ -1181,7 +1182,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
this,
isInvalidate(),
read,
- /*retval*/false,
+ retval,
isolation,
accessTtl,
CU.empty0(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index d2d8d62..7caea23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -628,8 +628,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
null,
/*swap*/swapOrOffheap,
/*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/!skipVals,
/**temporary*/false,
@@ -1187,8 +1185,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
null,
/*swap*/true,
/*read-through*/true,
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
@@ -1311,8 +1307,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
null,
/*swap*/true,
/*read-through*/ctx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
@@ -1348,8 +1342,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
null,
/*swap*/true,
/*read-through*/ctx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/true,
/**update-metrics*/true,
/**event*/true,
/**temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 66d4df0..3bb3a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1492,8 +1492,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this,
/*swap*/false,
/*read through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
/*metrics*/metrics,
/*event*/recordEvt,
/*temporary*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 90aa8d2..3258ce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -57,6 +58,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
/**
@@ -185,9 +187,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
private byte[] expiryPlcBytes;
/**
- * Additional flags.
- * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
- * GridCacheUtils.KEEP_BINARY_FLAG_MASK - for withKeepBinary flag.
+ * Additional flags:
+ * <ul>
+ * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag value.</li>
+ * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary flag.</li>
+ * </ul>
*/
private byte flags;
@@ -483,6 +487,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
+ * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+ */
+ public void oldValueOnPrimary(boolean oldValOnPrimary) {
+ setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
+ }
+
+ /**
+ * @return {@code True} If old value for 'invoke' operation was non null on primary node.
+ */
+ public boolean oldValueOnPrimary() {
+ return isFlag(OLD_VAL_ON_PRIMARY);
+ }
+
+ /**
* Sets keep binary flag value.
*
* @param keepBinary Keep binary flag value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..5b09760 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,9 +25,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -73,6 +75,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
@@ -1165,6 +1168,49 @@ public class IgniteTxHandler {
if (info != null && !info.isNew() && !info.isDeleted())
res.addPreloadEntry(info);
}
+
+ if (cacheCtx.readThroughConfigured() &&
+ !entry.skipStore() &&
+ entry.op() == TRANSFORM &&
+ entry.oldValueOnPrimary() &&
+ !entry.hasValue()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = entry.cached();
+
+ if (cached == null)
+ cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+ CacheObject val = cached.innerGet(
+ /*ver*/null,
+ tx,
+ /*readSwap*/true,
+ /*readThrough*/false,
+ /*updateMetrics*/false,
+ /*evt*/false,
+ /*tmp*/false,
+ tx.subjectId(),
+ /*transformClo*/null,
+ tx.resolveTaskName(),
+ /*expiryPlc*/null,
+ /*keepBinary*/true);
+
+ if (val == null)
+ val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
+
+ if (val != null)
+ entry.readValue(val);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Got entry removed exception, will retry: " + entry.txKey());
+
+ entry.cached(null);
+ }
+ }
+ }
}
catch (GridDhtInvalidPartitionException e) {
tx.addInvalidPartition(cacheCtx, e.partition());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8eea8dd..28ecda5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1255,8 +1255,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
this,
/*swap*/true,
/*read-through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*event*/!skipVals,
/*temporary*/false,
@@ -1340,9 +1338,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
this,
/*swap*/true,
- /*no read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
+ /*read-through*/false,
/*metrics*/true,
/*event*/true,
/*temporary*/false,
@@ -1687,8 +1683,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
IgniteTxLocalAdapter.this,
cacheCtx.isSwapOrOffheapEnabled(),
/*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
/*metrics*/true,
/*events*/!skipVals,
/*temporary*/false,
@@ -1992,7 +1986,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
needReadVer,
singleRmv,
hasFilters,
- skipStore,
+ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary);
}
@@ -2153,7 +2147,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
needReadVer,
singleRmv,
hasFilters,
- skipStore,
+ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
retval,
keepBinary);
}
@@ -2173,7 +2167,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param needReadVer Read version flag.
* @param singleRmv {@code True} for single remove operation.
* @param hasFilters {@code True} if filters not empty.
- * @param skipStore Skip store flag.
+ * @param readThrough Read through flag.
* @param retval Return value flag.
* @return Load future.
*/
@@ -2185,7 +2179,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final boolean needReadVer,
final boolean singleRmv,
final boolean hasFilters,
- final boolean skipStore,
+ final boolean readThrough,
final boolean retval,
final boolean keepBinary) {
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
@@ -2218,6 +2212,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (e.op() == TRANSFORM) {
GridCacheVersion ver;
+ e.readValue(cacheVal);
+
try {
ver = e.cached().version();
}
@@ -2243,7 +2239,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return loadMissing(
cacheCtx,
- /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+ readThrough,
/*async*/true,
keys,
/*skipVals*/singleRmv,
@@ -2354,8 +2350,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
this,
/*swap*/false,
/*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/retval,
/*metrics*/retval,
/*events*/retval,
/*temporary*/false,
@@ -2652,16 +2646,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (retval || invoke) {
if (!cacheCtx.isNear()) {
if (!hasPrevVal) {
- boolean readThrough =
- (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+ // For non-local cache should read from store after lock on primary.
+ boolean readThrough = cacheCtx.isLocal() &&
+ (invoke || cacheCtx.loadPreviousValue()) &&
+ !txEntry.skipStore();
v = cached.innerGet(
null,
this,
/*swap*/true,
readThrough,
- /*failFast*/false,
- /*unmarshal*/true,
/*metrics*/!invoke,
/*event*/!invoke && !dht(),
/*temporary*/false,
@@ -2889,7 +2883,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
timeout,
this,
- false,
+ /*read*/entryProcessor != null, // Needed to force load from store.
retval,
isolation,
isInvalidate(),
@@ -3069,7 +3063,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
timeout,
this,
- false,
+ /*read*/invokeMap != null, // Needed to force load from store.
retval,
isolation,
isInvalidate(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4599060..9dc6a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1621,7 +1621,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
expiryTime,
false,
topVer,
- GridDrType.DR_LOAD);
+ GridDrType.DR_LOAD,
+ false);
cctx.evicts().touch(entry, topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 09cf7c8..ebca870 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -432,8 +432,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
- boolean failFast,
- boolean unmarshal,
boolean updateMetrics,
boolean evt,
boolean tmp,
@@ -668,7 +666,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
long expireTime,
boolean preload,
AffinityTopologyVersion topVer,
- GridDrType drType
+ GridDrType drType,
+ boolean fromStore
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index ce60232..45b6e9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -54,7 +54,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
- protected static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+ public static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
/**
* @return Grids count to start.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
new file mode 100644
index 0000000..294ebea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class IgniteCacheInvokeReadThroughAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static volatile boolean failed;
+
+ /** */
+ protected boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ failed = false;
+
+ startNodes();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ IgniteCacheAbstractTest.storeMap.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @return Store factory.
+ */
+ protected Factory<CacheStore> cacheStoreFactory() {
+ return new IgniteCacheAbstractTest.TestStoreFactory();
+ }
+
+ /**
+ * @param data Data.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ protected void putDataInStore(Map<Object, Object> data, String cacheName) throws Exception {
+ IgniteCacheAbstractTest.storeMap.putAll(data);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected abstract void startNodes() throws Exception;
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ protected void invokeReadThrough(CacheConfiguration ccfg) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ ignite0.createCache(ccfg);
+
+ try {
+ int key = 0;
+
+ for (Ignite node : G.allGrids()) {
+ if (node.configuration().isClientMode() && ccfg.getNearConfiguration() != null)
+ node.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+ }
+
+ for (Ignite node : G.allGrids()) {
+ log.info("Test for node: " + node.name());
+
+ IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+ for (int i = 0; i < 50; i++)
+ checkReadThrough(cache, key++, null, null);
+
+ Set<Object> keys = new HashSet<>();
+
+ for (int i = 0; i < 5; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, null, null);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, null, null);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ for (int i = 0; i < 50; i++)
+ checkReadThrough(cache, key++, concurrency, isolation);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 5; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+
+ keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(key++);
+
+ checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+ }
+ }
+
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ log.info("Test tx2 [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ for (int i = 0; i < 50; i++)
+ checkReadThroughGetAndInvoke(cache, key++, concurrency, isolation);
+ }
+ }
+ }
+ }
+
+ ignite0.cache(ccfg.getName()).removeAll();
+ }
+ finally {
+ ignite0.destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThrough(IgniteCache<Object, Object> cache,
+ Object key,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation) throws Exception {
+ putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+ Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+ : null;
+
+ try {
+ Object ret = cache.invoke(key, new TestEntryProcessor());
+
+ assertEquals(key, ret);
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThroughGetAndInvoke(IgniteCache<Object, Object> cache,
+ Object key,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation) throws Exception {
+ putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+ try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)) {
+ cache.get(key);
+
+ Object ret = cache.invoke(key, new TestEntryProcessor());
+
+ assertEquals(key, ret);
+
+ tx.commit();
+ }
+
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param keys Key.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkReadThroughInvokeAll(IgniteCache<Object, Object> cache,
+ Set<Object> keys,
+ @Nullable TransactionConcurrency concurrency,
+ @Nullable TransactionIsolation isolation) throws Exception {
+ Map<Object, Object> data = U.newHashMap(keys.size());
+
+ for (Object key : keys)
+ data.put(key, key);
+
+ putDataInStore(data, cache.getName());
+
+ Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+ : null;
+
+ try {
+ Map<Object, EntryProcessorResult<Object>> ret = cache.invokeAll(keys, new TestEntryProcessor());
+
+ assertEquals(ret.size(), keys.size());
+
+ for (Object key : keys) {
+ EntryProcessorResult<Object> res = ret.get(key);
+
+ assertNotNull(res);
+ assertEquals(key, res.get());
+ }
+
+ if (tx != null)
+ tx.commit();
+ }
+ finally {
+ if (tx != null)
+ tx.close();
+ }
+
+ for (Object key : keys)
+ checkValue(cache.getName(), key, (Integer)key + 1);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ * @param val Expected value.
+ */
+ private void checkValue(String cacheName, Object key, Object val) {
+ for (Ignite ignite : G.allGrids()) {
+ assertEquals("Unexpected value for node: " + ignite.name(),
+ val,
+ ignite.cache(cacheName).get(key));
+ }
+
+ assertFalse(failed);
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Atomicity mode.
+ * @param memoryMode Memory mode.
+ * @param backups Number of backups.
+ * @param nearCache Near cache flag.
+ * @return Cache configuration.
+ */
+ @SuppressWarnings("unchecked")
+ protected CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ CacheMemoryMode memoryMode,
+ int backups,
+ boolean nearCache) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+ ccfg.setCacheStoreFactory(cacheStoreFactory());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+ ccfg.setMemoryMode(memoryMode);
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+ if (!entry.exists()) {
+ failed = true;
+
+ fail();
+ }
+
+ Integer val = (Integer)entry.getValue();
+
+ if (!val.equals(entry.getKey())) {
+ failed = true;
+
+ assertEquals(val, entry.getKey());
+ }
+
+ entry.setValue(val + 1);
+
+ return val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
new file mode 100644
index 0000000..b451abf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteCacheInvokeReadThroughSingleNodeTest extends IgniteCacheInvokeReadThroughAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void startNodes() throws Exception {
+ startGrid(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicLocal() throws Exception {
+ invokeReadThrough(cacheConfiguration(LOCAL, ATOMIC, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxLocal() throws Exception {
+ invokeReadThrough(cacheConfiguration(LOCAL, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 2464e81..9578227 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -17,111 +17,163 @@
package org.apache.ignite.internal.processors.cache;
-import javax.cache.configuration.Factory;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
/**
*
*/
-public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
+public class IgniteCacheInvokeReadThroughTest extends IgniteCacheInvokeReadThroughAbstractTest {
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-114");
- }
+ @Override protected void startNodes() throws Exception {
+ startGridsMultiThreaded(4);
- /** */
- private static volatile boolean failed;
+ client = true;
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
+ startGrid(4);
}
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return PARTITIONED;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic0() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 0, false));
}
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return TRANSACTIONAL;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic1() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
}
- /** {@inheritDoc} */
- @Override protected NearCacheConfiguration nearConfiguration() {
- return null;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic2() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 2, false));
}
- /** {@inheritDoc} */
- @Override protected Factory<CacheStore> cacheStoreFactory() {
- return new TestStoreFactory();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- failed = false;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
}
/**
* @throws Exception If failed.
*/
- public void testInvokeReadThrough() throws Exception {
- IgniteCache<Integer, Integer> cache = jcache(0);
-
- checkReadThrough(cache, primaryKey(cache));
+ public void testInvokeReadThroughAtomic0_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 0, false));
+ }
- checkReadThrough(cache, backupKey(cache));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic1_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+ }
- checkReadThrough(cache, nearKey(cache));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomic2_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 2, false));
}
/**
- * @param cache Cache.
- * @param key Key.
+ * @throws Exception If failed.
*/
- private void checkReadThrough(IgniteCache<Integer, Integer> cache, Integer key) {
- log.info("Test [key=" + key + ']');
+ public void testInvokeReadThroughAtomicNearCache_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, true));
+ }
- storeMap.put(key, key);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughAtomicReplicated_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, OFFHEAP_TIERED, 0, false));
+ }
- Object ret = cache.invoke(key, new EntryProcessor<Integer, Integer, Object>() {
- @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) {
- if (!entry.exists()) {
- failed = true;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx0() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
- fail();
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx1() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+ }
- Integer val = entry.getValue();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx2() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 2, false));
+ }
- if (!val.equals(entry.getKey())) {
- failed = true;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+ }
- assertEquals(val, entry.getKey());
- }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+ }
- entry.setValue(val + 1);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx0_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
+ }
- return val;
- }
- });
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx1_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+ }
- assertEquals(key, ret);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTx2_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 2, false));
+ }
- for (int i = 0; i < gridCount(); i++)
- assertEquals("Unexpected value for node: " + i, key + 1, jcache(i).get(key));
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxNearCache_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, true));
+ }
- assertFalse(failed);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvokeReadThroughTxReplicated_Offheap() throws Exception {
+ invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
}
}