You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/02/03 10:06:02 UTC
[2/4] ignite git commit: 2224 (cherry picked from commit 0113506)
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ffe5373..104af94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -174,6 +174,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
txMapping = new GridDhtTxMapping();
for (IgniteTxEntry txEntry : tx.allEntries()) {
+ txEntry.clearEntryReadVersion();
+
GridCacheContext cacheCtx = txEntry.context();
List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a09dec0..a3130cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -122,7 +122,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -146,7 +147,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
deserializeBinary,
skipVals,
false,
- skipStore);
+ skipStore,
+ needVer);
}
}, opCtx);
}
@@ -162,7 +164,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
skipStore,
- canRemap);
+ canRemap,
+ needVer);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6130ead..fed3e33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -477,7 +477,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+ @Override @Nullable public V get(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
Map<K, V> m = getAllInternal(Collections.singleton(key),
@@ -485,7 +485,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.readThrough(),
taskName,
deserializeBinary,
- false);
+ false,
+ needVer);
assert m.isEmpty() || m.size() == 1 : m.size();
@@ -494,7 +495,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary)
+ @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
throws IgniteCheckedException {
A.notNull(keys, "keys");
@@ -505,7 +506,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
ctx.readThrough(),
taskName,
deserializeBinary,
- false);
+ false,
+ needVer);
}
@@ -519,7 +521,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
final String taskName,
final boolean deserializeBinary,
final boolean skipVals,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
A.notNull(keys, "keys");
@@ -528,7 +531,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
return asyncOp(new Callable<Map<K, V>>() {
@Override public Map<K, V> call() throws Exception {
- return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals);
+ return getAllInternal(keys, swapOrOffheap, storeEnabled, taskName, deserializeBinary, skipVals, needVer);
}
});
}
@@ -542,6 +545,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
* @param taskName Task name.
* @param deserializeBinary Deserialize binary .
* @param skipVals Skip value flag.
+ * @param needVer Need version.
* @return Key-value map.
* @throws IgniteCheckedException If failed.
*/
@@ -551,7 +555,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
boolean storeEnabled,
String taskName,
boolean deserializeBinary,
- boolean skipVals
+ boolean skipVals,
+ boolean needVer
) throws IgniteCheckedException {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -584,24 +589,65 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey);
if (entry != null) {
- CacheObject v = entry.innerGet(null,
- /*swap*/swapOrOffheap,
- /*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/true,
- /**update-metrics*/true,
- /**event*/!skipVals,
- /**temporary*/false,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
+ CacheObject v ;
+ GridCacheVersion ver;
- if (v != null)
- ctx.addResult(vals, cacheKey, v, skipVals, false, deserializeBinary, true);
- else
- success = false;
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ null,
+ /*swap*/swapOrOffheap,
+ /*unmarshal*/true,
+ /**update-metrics*/false,
+ /*event*/!skipVals,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (res != null) {
+ v = res.get1();
+ ver = res.get2();
+
+ ctx.addResult(
+ vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true,
+ ver);
+ }else
+ success = false;
+ }
+ else {
+ v = entry.innerGet(null,
+ /*swap*/swapOrOffheap,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/true,
+ /**update-metrics*/true,
+ /**event*/!skipVals,
+ /**temporary*/false,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (v != null) {
+ ctx.addResult(vals,
+ cacheKey,
+ v,
+ skipVals,
+ false,
+ deserializeBinary,
+ true);
+ }
+ else
+ success = false;
+ }
}
else {
if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals)
@@ -638,7 +684,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/*force primary*/false,
expiry,
skipVals,
- /*can remap*/true).get();
+ /*can remap*/true,
+ needVer).get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 2c6c3df..6621884 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
@@ -73,6 +74,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Dummy version for any existing entry read in SERIALIZABLE transaction. */
public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
+ /** */
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2);
+
+ /** */
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3);
+
/** Owning transaction. */
@GridToStringExclude
@GridDirectTransient
@@ -909,13 +916,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
- * @param serReadVer Read version for serializable transaction.
+ * Gets stored entry version. Version is stored for all entries in serializable transaction or
+ * when value is read using {@link IgniteCache#getEntry(Object)} method.
+ *
+ * @return Entry version.
+ */
+ @Nullable public GridCacheVersion entryReadVersion() {
+ return serReadVer;
+ }
+
+ /**
+ * @param ver Entry version.
*/
- public void serializableReadVersion(GridCacheVersion serReadVer) {
+ public void entryReadVersion(GridCacheVersion ver) {
assert this.serReadVer == null;
- assert serReadVer != null;
+ assert ver != null;
- this.serReadVer = serReadVer;
+ this.serReadVer = ver;
+ }
+
+ /**
+ * Clears recorded read version, should be done before starting commit of not serializable/optimistic transaction.
+ */
+ public void clearEntryReadVersion() {
+ serReadVer = null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index bf80e0b..10419b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1334,7 +1335,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Checks if there is a cached or swapped value for
- * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param keys Key to enlist.
@@ -1360,7 +1361,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
boolean deserializeBinary,
boolean skipVals,
boolean keepCacheObjects,
- boolean skipStore
+ boolean skipStore,
+ final boolean needVer
) throws IgniteCheckedException {
assert !F.isEmpty(keys);
assert keysCnt == keys.size();
@@ -1373,7 +1375,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
AffinityTopologyVersion topVer = topologyVersion();
- boolean needReadVer = serializable() && optimistic();
+ boolean needReadVer = (serializable() && optimistic()) || needVer;
// In this loop we cover only read-committed or optimistic transactions.
// Transactions that are pessimistic and not read-committed are covered
@@ -1395,31 +1397,89 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
- if (val != null)
- cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false);
+ if (val != null) {
+ GridCacheVersion ver = null;
+
+ if (needVer) {
+ if (txEntry.op() != READ)
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+ else {
+ ver = txEntry.entryReadVersion();
+
+ if (ver == null && pessimistic()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ ver = cached.isNear() ?
+ ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException rmvdErr) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
+ }
+
+ if (ver == null) {
+ assert optimistic() && repeatableRead() : this;
+
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+ }
+ }
+
+ assert ver != null;
+ }
+
+ cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, ver);
+ }
}
else {
assert txEntry.op() == TRANSFORM;
while (true) {
try {
+ GridCacheVersion readVer = null;
+
Object transformClo =
- (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ (txEntry.op() == TRANSFORM &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
F.first(txEntry.entryProcessors()) : null;
- val = txEntry.cached().innerGet(this,
- /*swap*/true,
- /*read-through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*event*/!skipVals,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+ this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+
+ if (res != null) {
+ val = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else {
+ val = txEntry.cached().innerGet(this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/!skipVals,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
if (val != null) {
if (!readCommitted() && !skipVals)
@@ -1434,7 +1494,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipVals,
keepCacheObjects,
deserializeBinary,
- false);
+ false,
+ readVer);
}
else
missed.put(key, txEntry.cached().version());
@@ -1509,7 +1570,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipVals,
keepCacheObjects,
deserializeBinary,
- false);
+ false,
+ needVer ? readVer : null);
}
else
missed.put(key, ver);
@@ -1541,7 +1603,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(readVer);
+ txEntry.entryReadVersion(readVer);
}
}
}
@@ -1592,7 +1654,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Loads all missed keys for
- * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param map Return map.
@@ -1610,12 +1672,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final boolean deserializeBinary,
final boolean skipVals,
final boolean keepCacheObjects,
- final boolean skipStore
+ final boolean skipStore,
+ final boolean needVer
+
) {
if (log.isDebugEnabled())
log.debug("Loading missed values for missed map: " + missedMap);
- final boolean needReadVer = serializable() && optimistic();
+ final boolean needReadVer = (serializable() && optimistic()) || needVer;
return new GridEmbeddedFuture<>(
new C2<Void, Exception, Map<K, V>>() {
@@ -1677,7 +1741,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipVals,
keepCacheObjects,
deserializeBinary,
- false);
+ false,
+ needVer ? loadVer : null);
}
}
else {
@@ -1688,7 +1753,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (needReadVer) {
assert loadVer != null;
- txEntry.serializableReadVersion(loadVer);
+ txEntry.entryReadVersion(loadVer);
}
if (visibleVal != null) {
@@ -1698,7 +1763,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipVals,
keepCacheObjects,
deserializeBinary,
- false);
+ false,
+ needVer ? loadVer : null);
}
}
}
@@ -1707,13 +1773,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
Collection<KeyCacheObject> keys,
final boolean deserializeBinary,
final boolean skipVals,
final boolean keepCacheObjects,
- final boolean skipStore) {
+ final boolean skipStore,
+ final boolean needVer) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -1743,7 +1811,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
deserializeBinary,
skipVals,
keepCacheObjects,
- skipStore);
+ skipStore,
+ needVer);
if (single && missed.isEmpty())
return new GridFinishedFuture<>(retMap);
@@ -1789,25 +1858,48 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
while (true) {
GridCacheEntryEx cached = txEntry.cached();
+ CacheObject val = null;
+ GridCacheVersion readVer = null;
+
try {
Object transformClo =
(!F.isEmpty(txEntry.entryProcessors()) &&
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
F.first(txEntry.entryProcessors()) : null;
- CacheObject val = cached.innerGet(IgniteTxLocalAdapter.this,
- cacheCtx.isSwapOrOffheapEnabled(),
- /*read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*events*/!skipVals,
- /*temporary*/true,
- CU.subjectId(IgniteTxLocalAdapter.this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
+ if (needVer) {
+ T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+ IgniteTxLocalAdapter.this,
+ /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ /**update-metrics*/true,
+ /*event*/!skipVals,
+ CU.subjectId(IgniteTxLocalAdapter.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+
+ if (res != null) {
+ val = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else{
+ val = cached.innerGet(IgniteTxLocalAdapter.this,
+ cacheCtx.isSwapOrOffheapEnabled(),
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*events*/!skipVals,
+ /*temporary*/true,
+ CU.subjectId(IgniteTxLocalAdapter.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ null,
+ txEntry.keepBinary());
+ }
// If value is in cache and passed the filter.
if (val != null) {
@@ -1824,7 +1916,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipVals,
keepCacheObjects,
deserializeBinary,
- false);
+ false,
+ readVer);
+
+ if (readVer != null)
+ txEntry.entryReadVersion(readVer);
}
// Even though we bring the value back from lock acquisition,
@@ -1850,7 +1946,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
deserializeBinary,
skipVals,
keepCacheObjects,
- skipStore);
+ skipStore,
+ needVer);
}
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -1916,7 +2013,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
deserializeBinary,
skipVals,
keepCacheObjects,
- skipStore);
+ skipStore,
+ needVer);
}
return new GridFinishedFuture<>(retMap);
@@ -2307,7 +2405,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (needReadVer) {
assert loadVer != null;
- e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
}
if (singleRmv) {
@@ -2500,7 +2598,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
}
@@ -2553,7 +2651,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
if (retval && !transform)
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index a5d3373..78f517c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -76,7 +76,8 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
boolean deserializeBinary,
boolean skipVals,
boolean keepCacheObjects,
- boolean skipStore);
+ boolean skipStore,
+ boolean needVer);
/**
* @param cacheCtx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 818ac11..2c1205a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1417,7 +1417,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !entry1.detached() : "Expected non-detached entry for near transaction " +
"[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
- GridCacheVersion serReadVer = txEntry1.serializableReadVersion();
+ GridCacheVersion serReadVer = txEntry1.entryReadVersion();
assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
new file mode 100644
index 0000000..c0ba42c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTest {
+ /** */
+ private static final String UPDATED_ENTRY_ERR = "Impossible to get version for entry updated in transaction";
+
+ /** */
+ private static final String ENTRY_AFTER_GET_ERR = "Impossible to get entry version after get()";
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * @return Transaction concurrency.
+ */
+ abstract protected TransactionConcurrency concurrency();
+
+ /**
+ *
+ * @return Transaction isolation.
+ */
+ abstract protected TransactionIsolation isolation();
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(null);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNear() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("near");
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("nearT");
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitioned() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("partitioned");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("partitionedT");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(LOCAL);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("local");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(LOCAL);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("localT");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(REPLICATED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("replicated");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(REPLICATED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("replicatedT");
+
+ test(cfg);
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void test(CacheConfiguration cfg) throws Exception {
+ test(cfg, true);
+
+ test(cfg, false);
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void test(CacheConfiguration cfg, final boolean oneEntry) throws Exception {
+ final IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg);
+
+ try {
+ init(cache);
+
+ test(cache, null, null, null, oneEntry);
+
+ if (cfg.getAtomicityMode() == TRANSACTIONAL) {
+ TransactionConcurrency txConcurrency = concurrency();
+ TransactionIsolation txIsolation = isolation();
+
+ try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation)) {
+ initTx(cache);
+
+ test(cache, txConcurrency, txIsolation, tx, oneEntry);
+
+ tx.commit();
+ }
+
+ testConcurrentTx(cache, OPTIMISTIC, REPEATABLE_READ, oneEntry);
+ testConcurrentTx(cache, OPTIMISTIC, READ_COMMITTED, oneEntry);
+
+ testConcurrentTx(cache, PESSIMISTIC, REPEATABLE_READ, oneEntry);
+ testConcurrentTx(cache, PESSIMISTIC, READ_COMMITTED, oneEntry);
+ }
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param txConcurrency Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void testConcurrentTx(final IgniteCache<Integer, TestValue> cache,
+ final TransactionConcurrency txConcurrency,
+ final TransactionIsolation txIsolation,
+ final boolean oneEntry) throws Exception {
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteTransactions txs = grid(0).transactions();
+
+ long stopTime = System.currentTimeMillis() + 3000;
+
+ while (System.currentTimeMillis() < stopTime) {
+ Set<Integer> keys = new LinkedHashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(i);
+
+ try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
+ if (oneEntry) {
+ for (int i = 0; i < 100; i++)
+ cache.getEntry(i);
+ }
+ else
+ cache.getEntries(keys);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, new TestValue(i));
+
+ tx.commit();
+ }
+ }
+
+ return null;
+ }
+ }, 10, "tx-thread");
+ }
+
+ /**
+ * @param base Start value.
+ * @return Keys.
+ */
+ private Set<Integer> getKeys(int base) {
+ int start = 0;
+ int finish = 100;
+
+ Set<Integer> keys = new HashSet<>(finish - start);
+
+ for (int i = base + start; i < base + finish; ++i)
+ keys.add(i);
+
+ return keys;
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys() {
+ return getKeys(0);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys() {
+ return getKeys(1_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys2() {
+ return getKeys(2_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys2() {
+ return getKeys(3_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys3() {
+ return getKeys(4_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys3() {
+ return getKeys(5_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedBeforeTxKeys() {
+ return getKeys(6_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedBeforeTxWithBinaryKeys() {
+ return getKeys(7_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdAtTxKeys() {
+ return getKeys(8_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdAtTxWithBinaryKeys() {
+ return getKeys(9_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedAtTxKeys() {
+ return getKeys(10_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedAtTxWithBinaryKeys() {
+ return getKeys(11_000);
+ }
+
+ /**
+ * @param cache Cacge.
+ */
+ private void init(IgniteCache<Integer, TestValue> cache) {
+ Set<Integer> keys = new HashSet<>();
+
+ keys.addAll(createdBeforeTxKeys());
+ keys.addAll(createdBeforeTxWithBinaryKeys());
+ keys.addAll(createdBeforeTxKeys2());
+ keys.addAll(createdBeforeTxWithBinaryKeys2());
+ keys.addAll(createdBeforeTxKeys3());
+ keys.addAll(createdBeforeTxWithBinaryKeys3());
+ keys.addAll(removedBeforeTxKeys());
+ keys.addAll(removedBeforeTxWithBinaryKeys());
+ keys.addAll(removedAtTxKeys());
+ keys.addAll(removedAtTxWithBinaryKeys());
+
+ for (int i : keys)
+ cache.put(i, new TestValue(i));
+
+ for (int i : removedBeforeTxKeys())
+ cache.remove(i);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ cache.remove(i);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void initTx(IgniteCache<Integer, TestValue> cache) {
+ for (int i : createdAtTxKeys())
+ cache.put(i, new TestValue(i));
+
+ for (int i : createdAtTxWithBinaryKeys())
+ cache.put(i, new TestValue(i));
+
+ for (int i : removedAtTxKeys())
+ cache.remove(i);
+
+ for (int i : removedAtTxWithBinaryKeys())
+ cache.remove(i);
+ }
+
+ /**
+ * @param e Entry.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache)
+ throws Exception {
+ CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class);
+
+ if (cfg.getCacheMode() != LOCAL) {
+ Ignite prim = primaryNode(e.getKey(), cache.getName());
+
+ GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName());
+
+ if (cfg.getNearConfiguration() != null)
+ cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
+
+ IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects();
+
+ CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
+
+ GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
+ cacheObjCtx, e.getKey(), true));
+
+ assertNotNull("No entry for key: " + e.getKey(), mapEntry);
+ assertEquals(mapEntry.version(), e.version());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @param getVerErr Not null error if entry version access should fail.
+ * @param expKeys Expected keys with values.
+ * @throws Exception If failed.
+ */
+ private void checkData(IgniteCache<Integer, TestValue> cache,
+ int i,
+ boolean oneEntry,
+ @Nullable String getVerErr,
+ Set<Integer> expKeys) throws Exception {
+ if (oneEntry) {
+ final CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals(e.getValue().val, i);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ int expCnt = 0;
+
+ for (int j = 0; j < 10; j++) {
+ Integer key = i + j;
+
+ set.add(key);
+
+ if (expKeys.contains(key))
+ expCnt++;
+ }
+
+ Collection<CacheEntry<Integer, TestValue>> entries = cache.getEntries(set);
+
+ assertEquals(expCnt, entries.size());
+
+ for (final CacheEntry<Integer, TestValue> e : entries) {
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals((Integer)e.getValue().val, e.getKey());
+
+ assertTrue(set.contains(e.getValue().val));
+ }
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @param getVerErr Not null error if entry version access should fail.
+ * @param expKeys Expected keys with values.
+ * @throws Exception If failed.
+ */
+ private void checkBinaryData(IgniteCache<Integer, TestValue> cache,
+ int i,
+ boolean oneEntry,
+ @Nullable String getVerErr,
+ Set<Integer> expKeys) throws Exception {
+ IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+ if (oneEntry) {
+ final CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals(((TestValue)e.getValue().deserialize()).val, i);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ int expCnt = 0;
+
+ for (int j = 0; j < 10; j++) {
+ Integer key = i + j;
+
+ set.add(key);
+
+ if (expKeys.contains(key))
+ expCnt++;
+ }
+
+ Collection<CacheEntry<Integer, BinaryObject>> entries = cacheB.getEntries(set);
+
+ assertEquals(expCnt, entries.size());
+
+ for (final CacheEntry<Integer, BinaryObject> e : entries) {
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ TestValue tv = e.getValue().deserialize();
+
+ assertEquals((Integer)tv.val, e.getKey());
+
+ assertTrue(set.contains((tv).val));
+ }
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ */
+ private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+ if (oneEntry) {
+ CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+ assertNull(e);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
+
+ assertTrue(es.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ */
+ private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+ IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+ if (oneEntry) {
+ CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+ assertNull(e);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
+
+ assertTrue(es.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param txConcurrency Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @param tx Transaction.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void test(IgniteCache<Integer, TestValue> cache,
+ TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation,
+ Transaction tx,
+ boolean oneEntry) throws Exception {
+ if (tx == null) {
+ Set<Integer> keys = createdBeforeTxKeys();
+
+ for (int i : keys)
+ checkData(cache, i, oneEntry, null, keys);
+
+ keys = createdBeforeTxWithBinaryKeys();
+
+ for (int i : keys)
+ checkBinaryData(cache, i, oneEntry, null, keys);
+
+ for (int i : removedBeforeTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+ }
+ else {
+ Set<Integer> keys = createdBeforeTxKeys2();
+
+ for (int i : keys) {
+ checkData(cache, i, oneEntry, null, keys);
+ checkData(cache, i, oneEntry, null, keys);
+ }
+
+ keys = createdBeforeTxWithBinaryKeys2();
+
+ for (int i : keys) {
+ checkBinaryData(cache, i, oneEntry, null, keys);
+ checkBinaryData(cache, i, oneEntry, null, keys);
+ }
+
+ String verGetErr = null;
+
+ if (txConcurrency == OPTIMISTIC && txIsolation == REPEATABLE_READ)
+ verGetErr = ENTRY_AFTER_GET_ERR;
+
+ keys = createdBeforeTxKeys3();
+
+ for (int i : keys) {
+ if (oneEntry)
+ cache.get(i);
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ cache.getAll(set);
+ }
+
+ checkData(cache, i, oneEntry, verGetErr, keys);
+ }
+
+ keys = createdBeforeTxWithBinaryKeys3();
+
+ for (int i : keys) {
+ if (oneEntry)
+ cache.get(i);
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ cache.getAll(set);
+ }
+
+ checkBinaryData(cache, i, oneEntry, verGetErr, keys);
+ }
+
+ keys = createdAtTxKeys();
+
+ for (int i : keys)
+ checkData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+ keys = createdAtTxWithBinaryKeys();
+
+ for (int i : keys)
+ checkBinaryData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+ for (int i : removedBeforeTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+
+ for (int i : removedAtTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedAtTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private int val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public int value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
new file mode 100644
index 0000000..acc21df
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.OPTIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.READ_COMMITTED;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
new file mode 100644
index 0000000..6153869
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.OPTIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.REPEATABLE_READ;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
new file mode 100644
index 0000000..6ded4a9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.OPTIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.SERIALIZABLE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
new file mode 100644
index 0000000..975d271
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.READ_COMMITTED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
new file mode 100644
index 0000000..dac64d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.REPEATABLE_READ;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
new file mode 100644
index 0000000..70f71ce
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected TransactionConcurrency concurrency() {
+ return TransactionConcurrency.PESSIMISTIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TransactionIsolation isolation() {
+ return TransactionIsolation.SERIALIZABLE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java
index c606a2a..b60ada7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheReadThroughRestartSelfTest.java
@@ -85,6 +85,20 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest {
* @throws Exception If failed.
*/
public void testReadThroughInTx() throws Exception {
+ testReadThroughInTx(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadEntryThroughInTx() throws Exception {
+ testReadThroughInTx(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testReadThroughInTx(boolean needVer) throws Exception {
IgniteCache<String, Integer> cache = grid(1).cache(null);
for (int k = 0; k < 1000; k++)
@@ -104,7 +118,14 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest {
for (int k = 0; k < 1000; k++) {
String key = "key" + k;
- assertNotNull("Null value for key: " + key, cache.get(key));
+ if (needVer) {
+ assertNotNull("Null value for key: " + key, cache.getEntry(key));
+ assertNotNull("Null value for key: " + key, cache.getEntry(key));
+ }
+ else {
+ assertNotNull("Null value for key: " + key, cache.get(key));
+ assertNotNull("Null value for key: " + key, cache.get(key));
+ }
}
tx.commit();
@@ -117,6 +138,20 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest {
* @throws Exception If failed.
*/
public void testReadThrough() throws Exception {
+ testReadThrough(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadEntryThrough() throws Exception {
+ testReadThrough(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testReadThrough(boolean needVer) throws Exception {
IgniteCache<String, Integer> cache = grid(1).cache(null);
for (int k = 0; k < 1000; k++)
@@ -132,8 +167,10 @@ public class CacheReadThroughRestartSelfTest extends GridCacheAbstractSelfTest {
for (int k = 0; k < 1000; k++) {
String key = "key" + k;
-
- assertNotNull("Null value for key: " + key, cache.get(key));
+ if (needVer)
+ assertNotNull("Null value for key: " + key, cache.getEntry(key));
+ else
+ assertNotNull("Null value for key: " + key, cache.get(key));
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/39c1bff2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 9906ad3..4baef66 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -46,6 +47,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.store.CacheStore;
@@ -575,6 +577,20 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testTxCommitReadOnlyGetAll() throws Exception {
+ testTxCommitReadOnlyGetAll(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnlyGetEntries() throws Exception {
+ testTxCommitReadOnlyGetAll(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnlyGetAll(boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
@@ -591,9 +607,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
keys.add(i);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Map<Integer, Integer> map = cache.getAll(keys);
+ if (needVer) {
+ Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys);
- assertTrue(map.isEmpty());
+ assertTrue(c.isEmpty());
+ }
+ else {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+ }
tx.commit();
}
@@ -602,9 +625,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Map<Integer, Integer> map = cache.getAll(keys);
+ if (needVer) {
+ Collection<CacheEntry<Integer, Integer>> c = cache.getEntries(keys);
- assertTrue(map.isEmpty());
+ assertTrue(c.isEmpty());
+ }
+ else {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+ }
tx.rollback();
}
@@ -653,21 +683,35 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testTxConflictRead1() throws Exception {
- txConflictRead(true);
+ txConflictRead(true, false);
}
/**
* @throws Exception If failed.
*/
public void testTxConflictRead2() throws Exception {
- txConflictRead(false);
+ txConflictRead(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntry1() throws Exception {
+ txConflictRead(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntry2() throws Exception {
+ txConflictRead(false, true);
}
/**
* @param noVal If {@code true} there is no cache value when read in tx.
* @throws Exception If failed.
*/
- private void txConflictRead(boolean noVal) throws Exception {
+ private void txConflictRead(boolean noVal, boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
@@ -693,9 +737,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ if (needVer) {
+ CacheEntry<Integer, Integer> val = cache.getEntry(key);
- assertEquals(expVal, val);
+ assertEquals(expVal, val == null ? null : val.getValue());
+ }
+ else {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+ }
updateKey(cache, key, 1);
@@ -711,9 +762,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Object val = cache.get(key);
+ if (needVer) {
+ CacheEntry<Integer, Integer> val = cache.getEntry(key);
- assertEquals(1, val);
+ assertEquals((Integer)1, val.getValue());
+ }
+ else {
+ Object val = cache.get(key);
+
+ assertEquals(1, val);
+ }
tx.commit();
}
@@ -731,28 +789,56 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testTxConflictReadWrite1() throws Exception {
- txConflictReadWrite(true, false);
+ txConflictReadWrite(true, false, false);
}
/**
* @throws Exception If failed.
*/
public void testTxConflictReadWrite2() throws Exception {
- txConflictReadWrite(false, false);
+ txConflictReadWrite(false, false, false);
}
/**
* @throws Exception If failed.
*/
public void testTxConflictReadRemove1() throws Exception {
- txConflictReadWrite(true, true);
+ txConflictReadWrite(true, true, false);
}
/**
* @throws Exception If failed.
*/
public void testTxConflictReadRemove2() throws Exception {
- txConflictReadWrite(false, true);
+ txConflictReadWrite(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntryWrite1() throws Exception {
+ txConflictReadWrite(true, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntryWrite2() throws Exception {
+ txConflictReadWrite(false, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntryRemove1() throws Exception {
+ txConflictReadWrite(true, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadEntryRemove2() throws Exception {
+ txConflictReadWrite(false, true, true);
}
/**
@@ -760,7 +846,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @param rmv If {@code true} tests remove, otherwise put.
* @throws Exception If failed.
*/
- private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception {
+ private void txConflictReadWrite(boolean noVal, boolean rmv, boolean needVer) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
@@ -786,9 +872,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ if (needVer) {
+ CacheEntry<Integer, Integer> val = cache.getEntry(key);
- assertEquals(expVal, val);
+ assertEquals(expVal, val == null ? null : val.getValue());
+ }
+ else {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+ }
updateKey(cache, key, 1);
@@ -809,9 +902,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ if (needVer) {
+ CacheEntry<Integer, Integer> val = cache.getEntry(key);
+
+ assertEquals(1, (Object)val.getValue());
+ }
+ else {
+ Integer val = cache.get(key);
- assertEquals(1, (Object) val);
+ assertEquals(1, (Object)val);
+ }
if (rmv)
cache.remove(key);
@@ -4239,7 +4339,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
ignite.destroyCache(cacheName);
}
catch (IgniteException ignore) {
- // No-op.
+ // No-op.
}
GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite.configuration().getSwapSpaceSpi();