You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/10/28 10:30:48 UTC
[45/46] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
ignite-1607 Implemented deadlock-free optimistic serializable tx mode
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d7543a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d7543a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d7543a8
Branch: refs/heads/ignite-843-rc1
Commit: 9d7543a8dedb85f1dc904b52be3673f4a151f8e7
Parents: db88860
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 28 10:53:29 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 28 10:53:30 2015 +0300
----------------------------------------------------------------------
.../configuration/TransactionConfiguration.java | 6 +-
.../processors/cache/CacheMetricsImpl.java | 12 +-
.../processors/cache/GridCacheAdapter.java | 459 +-
.../processors/cache/GridCacheEntryEx.java | 62 +-
.../processors/cache/GridCacheMapEntry.java | 137 +-
.../processors/cache/GridCacheMvcc.java | 143 +-
.../cache/GridCacheMvccCandidate.java | 26 +-
.../processors/cache/GridCacheProcessor.java | 13 -
.../distributed/GridDistributedCacheEntry.java | 12 +-
.../GridDistributedTxRemoteAdapter.java | 56 +-
.../dht/CacheDistributedGetFutureAdapter.java | 158 +
.../distributed/dht/GridDhtCacheAdapter.java | 17 +-
.../distributed/dht/GridDhtCacheEntry.java | 23 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 199 +-
.../distributed/dht/GridDhtLockFuture.java | 5 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 105 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 109 +-
.../dht/GridPartitionedGetFuture.java | 179 +-
.../dht/atomic/GridDhtAtomicCache.java | 16 +-
.../dht/colocated/GridDhtColocatedCache.java | 130 +-
.../colocated/GridDhtColocatedLockFuture.java | 6 +-
.../distributed/near/GridNearAtomicCache.java | 2 -
.../distributed/near/GridNearCacheAdapter.java | 32 +-
.../distributed/near/GridNearCacheEntry.java | 81 +-
.../distributed/near/GridNearGetFuture.java | 340 +-
.../distributed/near/GridNearGetRequest.java | 3 -
...arOptimisticSerializableTxPrepareFuture.java | 930 ++++
.../near/GridNearOptimisticTxPrepareFuture.java | 252 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 222 +
.../GridNearPessimisticTxPrepareFuture.java | 8 +-
.../near/GridNearTransactionalCache.java | 14 +-
.../near/GridNearTxFinishFuture.java | 12 +-
.../cache/distributed/near/GridNearTxLocal.java | 157 +-
.../near/GridNearTxPrepareFutureAdapter.java | 13 +-
.../cache/local/GridLocalCacheEntry.java | 23 +-
.../cache/local/GridLocalLockFuture.java | 2 +
.../local/atomic/GridLocalAtomicCache.java | 9 -
.../transactions/IgniteTransactionsImpl.java | 6 -
.../cache/transactions/IgniteTxAdapter.java | 123 +-
.../cache/transactions/IgniteTxEntry.java | 51 +-
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../transactions/IgniteTxLocalAdapter.java | 832 ++--
.../cache/transactions/IgniteTxLocalEx.java | 27 +-
.../cache/transactions/IgniteTxManager.java | 327 +-
.../cache/version/GridCacheVersionManager.java | 73 +-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../apache/ignite/transactions/Transaction.java | 2 +-
.../transactions/TransactionIsolation.java | 3 +-
.../cache/CacheNearReaderUpdateTest.java | 388 ++
.../CacheSerializableTransactionsTest.java | 4295 ++++++++++++++++++
.../cache/CrossCacheTxRandomOperationsTest.java | 6 +
.../GridCacheAbstractFailoverSelfTest.java | 14 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 4 +-
.../GridCacheAbstractRemoveFailureTest.java | 94 +-
.../GridCacheConcurrentTxMultiNodeTest.java | 3 -
.../cache/GridCacheMvccFlagsTest.java | 6 +-
.../cache/GridCacheMvccPartitionedSelfTest.java | 164 +
.../processors/cache/GridCacheMvccSelfTest.java | 3 +-
.../processors/cache/GridCacheTestEntryEx.java | 53 +-
.../processors/cache/IgniteTxAbstractTest.java | 42 +-
.../IgniteTxMultiThreadedAbstractTest.java | 106 +-
...onedNearDisabledTxMultiThreadedSelfTest.java | 31 +
...niteCacheClientNodeChangingTopologyTest.java | 170 +-
.../IgniteCacheCrossCacheTxFailoverTest.java | 19 +
.../dht/IgniteCacheLockFailoverSelfTest.java | 11 +
...eAtomicInvalidPartitionHandlingSelfTest.java | 6 +-
.../near/GridCacheNearTxExceptionSelfTest.java | 1 +
...CachePartitionedTxMultiThreadedSelfTest.java | 15 +-
.../DataStreamerUpdateAfterLoadTest.java | 184 +
.../loadtests/hashmap/GridHashMapLoadTest.java | 6 +-
.../inmemory/GridTestSwapSpaceSpi.java | 8 +
.../junits/common/GridCommonAbstractTest.java | 19 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 4 +-
.../testsuites/IgniteCacheTestSuite2.java | 2 +
.../testsuites/IgniteCacheTestSuite5.java | 40 +
.../config/benchmark-multicast.properties | 5 +-
.../IgniteAccountSerializableTxBenchmark.java | 81 +
.../cache/IgniteAccountTxAbstractBenchmark.java | 61 +
.../cache/IgniteAccountTxBenchmark.java | 74 +
.../cache/IgniteCacheAbstractBenchmark.java | 7 +-
.../IgnitePutAllSerializableTxBenchmark.java | 77 +
.../ignite/yardstick/cache/model/Account.java | 42 +
83 files changed, 9229 insertions(+), 2239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
index 7d3cebb..fc2a6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
@@ -92,15 +92,17 @@ public class TransactionConfiguration implements Serializable {
*
* @return {@code True} if serializable transactions are enabled, {@code false} otherwise.
*/
+ @Deprecated
public boolean isTxSerializableEnabled() {
return txSerEnabled;
}
/**
- * Enables/disables serializable cache transactions. See {@link #isTxSerializableEnabled()} for more information.
- *
* @param txSerEnabled Flag to enable/disable serializable cache transactions.
+
+ * @deprecated This method has no effect, {@link TransactionIsolation#SERIALIZABLE} isolation is always enabled.
*/
+ @Deprecated
public void setTxSerializableEnabled(boolean txSerEnabled) {
this.txSerEnabled = txSerEnabled;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index dfa0217..a60c22b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -351,7 +351,7 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public int getTxCommitQueueSize() {
- return cctx.tm().commitQueueSize();
+ return 0;
}
/** {@inheritDoc} */
@@ -366,12 +366,12 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public int getTxPrepareQueueSize() {
- return cctx.tm().prepareQueueSize();
+ return 0;
}
/** {@inheritDoc} */
@Override public int getTxStartVersionCountsSize() {
- return cctx.tm().startVersionCountsSize();
+ return 0;
}
/** {@inheritDoc} */
@@ -396,17 +396,17 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public int getTxDhtCommitQueueSize() {
- return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().commitQueueSize() : -1;
+ return 0;
}
/** {@inheritDoc} */
@Override public int getTxDhtPrepareQueueSize() {
- return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().prepareQueueSize() : -1;
+ return 0;
}
/** {@inheritDoc} */
@Override public int getTxDhtStartVersionCountsSize() {
- return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().startVersionCountsSize() : -1;
+ return 0;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 417b396..74951b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -608,7 +608,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Collections.singletonList(key),
/*force primary*/false,
/*skip tx*/false,
- /*entry*/null,
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
@@ -643,7 +642,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
keys,
/*force primary*/false,
/*skip tx*/false,
- /*entry*/null,
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
@@ -1273,7 +1271,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
F.asList(key),
/*force primary*/true,
/*skip tx*/false,
- /*cached entry*/null,
/*subject id*/null,
taskName,
/*deserialize cache objects*/true,
@@ -1291,7 +1288,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*force primary*/true,
/*skip tx*/false,
null,
- null,
taskName,
true,
false,
@@ -1317,7 +1313,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
F.asList(key),
/*force primary*/false,
/*skip tx*/false,
- /*cached entry*/null,
/*subject id*/null,
taskName,
/*deserialize cache objects*/true,
@@ -1339,7 +1334,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
!ctx.config().isReadFromBackup(),
/*skip tx*/true,
null,
- null,
taskName,
!ctx.keepPortable(),
/*skip values*/false,
@@ -1347,184 +1341,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * @param keys Keys.
- * @param reload Reload flag.
- * @param tx Transaction.
- * @param subjId Subject ID.
- * @param taskName Task name.
- * @param vis Visitor.
- * @return Future.
- */
- public IgniteInternalFuture<Object> readThroughAllAsync(final Collection<KeyCacheObject> keys,
- boolean reload,
- boolean skipVals,
- @Nullable final IgniteInternalTx tx,
- @Nullable UUID subjId,
- String taskName,
- final IgniteBiInClosure<KeyCacheObject, Object> vis) {
- return ctx.closures().callLocalSafe(new GPC<Object>() {
- @Nullable @Override public Object call() {
- try {
- ctx.store().loadAll(tx, keys, vis);
- }
- catch (IgniteCheckedException e) {
- throw new GridClosureException(e);
- }
-
- return null;
- }
- }, true);
- }
-
- /**
- * @param keys Keys.
- * @param ret Return flag.
- * @param skipVals Skip values flag.
- * @param subjId Subject ID.
- * @param taskName Task name.
- * @return Future.
- */
- public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> reloadAllAsync0(
- Collection<KeyCacheObject> keys,
- boolean ret,
- boolean skipVals,
- @Nullable UUID subjId,
- String taskName)
- {
- final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
- if (!F.isEmpty(keys)) {
- final Map<KeyCacheObject, GridCacheVersion> keyVers = new HashMap();
-
- for (KeyCacheObject key : keys) {
- if (key == null)
- continue;
-
- // Skip primary or backup entries for near cache.
- if (ctx.isNear() && ctx.affinity().localNode(key, topVer))
- continue;
-
- while (true) {
- try {
- GridCacheEntryEx entry = entryExSafe(key, topVer);
-
- if (entry == null)
- break;
-
- GridCacheVersion ver = entry.version();
-
- keyVers.put(key, ver);
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry for reload (will retry): " + key);
- }
- catch (GridDhtInvalidPartitionException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got invalid partition for key (will skip): " + key);
-
- break;
- }
- }
- }
-
- final Map<KeyCacheObject, CacheObject> map =
- ret ? U.<KeyCacheObject, CacheObject>newHashMap(keys.size()) : null;
-
- final Collection<KeyCacheObject> absentKeys = F.view(keyVers.keySet());
-
- final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
-
- IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
- subjId, taskName, new CI2<KeyCacheObject, Object>() {
- /** Version for all loaded entries. */
- private GridCacheVersion nextVer = ctx.versions().next();
-
- /** {@inheritDoc} */
- @Override public void apply(KeyCacheObject key, Object val) {
- loadedKeys.add(key);
-
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null) {
- try {
- GridCacheVersion curVer = keyVers.get(key);
-
- if (curVer != null) {
- boolean wasNew = entry.isNewLocked();
-
- entry.unswap();
-
- CacheObject cacheVal = ctx.toCacheObject(val);
-
- boolean set = entry.versionedValue(cacheVal, curVer, nextVer);
-
- ctx.evicts().touch(entry, topVer);
-
- if (map != null) {
- if (set || wasNew)
- map.put(key, cacheVal);
- else {
- CacheObject v = entry.peek(true, false, false, null);
-
- if (v != null)
- map.put(key, v);
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Set value loaded from store into entry [set=" + set + ", " +
- "curVer=" +
- curVer + ", newVer=" + nextVer + ", entry=" + entry + ']');
- }
- }
- else {
- if (log.isDebugEnabled()) {
- log.debug("Current version was not found (either entry was removed or " +
- "validation was not passed: " + entry);
- }
- }
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled()) {
- log.debug("Got removed entry for reload (will not store reloaded entry) " +
- "[entry=" + entry + ']');
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
- });
-
- return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<KeyCacheObject, CacheObject>>() {
- @Override public Map<KeyCacheObject, CacheObject> applyx(IgniteInternalFuture<Object> e)
- throws IgniteCheckedException {
- // Touch all not loaded keys.
- for (KeyCacheObject key : absentKeys) {
- if (!loadedKeys.contains(key)) {
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
- }
- }
-
- // Make sure there were no exceptions.
- e.get();
-
- return map;
- }
- });
- }
-
- return new GridFinishedFuture<>(Collections.<KeyCacheObject, CacheObject>emptyMap());
- }
-
- /**
* @param key Key.
* @param topVer Topology version.
* @return Entry.
@@ -1662,11 +1478,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param keys Keys.
* @param forcePrimary Force primary.
* @param skipTx Skip tx.
- * @param entry Entry.
* @param subjId Subj Id.
* @param taskName Task name.
* @param deserializePortable Deserialize portable.
* @param skipVals Skip values.
+ * @param canRemap Can remap flag.
* @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection)
*/
@@ -1674,7 +1490,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable Collection<? extends K> keys,
boolean forcePrimary,
boolean skipTx,
- @Nullable GridCacheEntryEx entry,
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
@@ -1687,7 +1502,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(keys,
opCtx == null || !opCtx.skipStore(),
- entry,
!skipTx,
subjId,
taskName,
@@ -1701,7 +1515,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param keys Keys.
* @param readThrough Read through.
- * @param cached Cached.
* @param checkTx Check tx.
* @param subjId Subj Id.
* @param taskName Task name.
@@ -1709,12 +1522,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param forcePrimary Froce primary.
* @param expiry Expiry policy.
* @param skipVals Skip values.
+ * @param canRemap Can remap flag.
* @return Future for the get operation.
* @see GridCacheAdapter#getAllAsync(Collection)
*/
public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
boolean readThrough,
- @Nullable GridCacheEntryEx cached,
boolean checkTx,
@Nullable final UUID subjId,
final String taskName,
@@ -1738,7 +1551,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
expiry,
skipVals,
false,
- canRemap);
+ canRemap,
+ false);
}
/**
@@ -1750,10 +1564,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param deserializePortable Deserialize portable flag.
* @param expiry Expiry policy.
* @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects
+ * @param keepCacheObjects Keep cache objects.
+ * @param canRemap Can remap flag.
+ * @param needVer If {@code true} returns values as tuples containing value and version.
* @return Future.
*/
- public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
+ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
final boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1762,7 +1578,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
final boolean keepCacheObjects,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1782,14 +1599,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (tx == null || tx.implicit()) {
try {
- assert keys != null;
-
final AffinityTopologyVersion topVer = tx == null
? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
: tx.topologyVersion();
final Map<K1, V1> map = new GridLeanMap<>(keys.size());
+ final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
+
Map<KeyCacheObject, GridCacheVersion> misses = null;
for (KeyCacheObject key : keys) {
@@ -1797,29 +1614,43 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key);
try {
- CacheObject val = entry.innerGet(null,
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
ctx.isSwapOrOffheapEnabled(),
- /*don't read-through*/false,
- /*fail-fast*/true,
/*unmarshal*/true,
/*update-metrics*/!skipVals,
/*event*/!skipVals,
- /*temporary*/false,
subjId,
null,
taskName,
expiry);
- if (val == null) {
- GridCacheVersion ver = entry.version();
+ if (res == null) {
+ if (storeEnabled) {
+ GridCacheVersion ver = entry.version();
- if (misses == null)
- misses = new GridLeanMap<>();
+ if (misses == null)
+ misses = new GridLeanMap<>();
- misses.put(key, ver);
+ misses.put(key, ver);
+ }
+ else
+ ctx.evicts().touch(entry, topVer);
}
else {
- ctx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, true);
+ if (needVer) {
+ assert keepCacheObjects;
+
+ map.put((K1)key, (V1)new T2<>(res.get1(), res.get2()));
+ }
+ else {
+ ctx.addResult(map,
+ key,
+ res.get1(),
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ true);
+ }
if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
@@ -1835,19 +1666,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (log.isDebugEnabled())
log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key);
}
- catch (GridCacheFilterFailedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + entry);
-
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
- ctx.evicts().touch(entry, topVer);
-
- break; // While loop.
- }
}
}
- if (!skipVals && misses != null && readThrough && ctx.readThrough()) {
+ if (storeEnabled && misses != null) {
final Map<KeyCacheObject, GridCacheVersion> loadKeys = misses;
final IgniteTxLocalAdapter tx0 = tx;
@@ -1858,9 +1680,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() {
@Override public Map<K1, V1> call() throws Exception {
ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() {
- /** New version for all new entries. */
- private GridCacheVersion nextVer;
-
@Override public void apply(KeyCacheObject key, Object val) {
GridCacheVersion ver = loadKeys.get(key);
@@ -1872,10 +1691,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return;
}
- // Initialize next version.
- if (nextVer == null)
- nextVer = ctx.versions().next();
-
loaded.add(key);
CacheObject cacheVal = ctx.toCacheObject(val);
@@ -1884,22 +1699,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key);
try {
- boolean set = entry.versionedValue(cacheVal, ver, nextVer);
+ GridCacheVersion verSet = entry.versionedValue(cacheVal, ver, null);
+
+ boolean set = verSet != null;
if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry [set=" + set +
- ", curVer=" + ver + ", newVer=" + nextVer + ", " +
+ log.debug("Set value loaded from store into entry [" +
+ "set=" + set +
+ ", curVer=" + ver +
+ ", newVer=" + verSet + ", " +
"entry=" + entry + ']');
// Don't put key-value pair into result map if value is null.
if (val != null) {
- ctx.addResult(map,
- key,
- cacheVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
+ if (needVer) {
+ assert keepCacheObjects;
+
+ map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver));
+ }
+ else {
+ ctx.addResult(map,
+ key,
+ cacheVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
}
if (tx0 == null || (!tx0.implicit() &&
@@ -1992,9 +1818,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
else {
+ assert !needVer;
+
return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
@Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
- return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false, !readThrough);
+ return tx.getAllAsync(ctx, keys, deserializePortable, skipVals, false, !readThrough);
}
});
}
@@ -2028,7 +1856,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
V prevVal = syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter).get().value();
+ return (V)tx.putAllAsync(ctx, F.t(key, val), true, filter).get().value();
}
@Override public String toString() {
@@ -2083,7 +1911,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter)
+ return tx.putAllAsync(ctx, F.t(key, val), true, filter)
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@@ -2122,7 +1950,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).get().success();
+ return tx.putAllAsync(ctx, F.t(key, val), false, filter).get().success();
}
@Override public String toString() {
@@ -2414,7 +2242,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain(
+ return tx.putAllAsync(ctx, F.t(key, val), false, filter).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
@@ -2449,7 +2277,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
+ return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()).get().value();
}
@Override public String toString() {
@@ -2473,7 +2301,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
+ return tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
@@ -2503,7 +2331,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success();
+ return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).get().success();
}
@Override public String toString() {
@@ -2532,7 +2360,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain(
+ return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
@@ -2558,7 +2386,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).get().value();
+ return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).get().value();
}
@Override public String toString() {
@@ -2582,7 +2410,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).chain(
+ return tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@@ -2608,7 +2436,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success();
+ return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).get().success();
}
@Override public String toString() {
@@ -2628,7 +2456,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
+ return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
@@ -2655,7 +2483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsValArray(oldVal)).get()
+ return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).get()
.success();
}
@@ -2692,7 +2520,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
- return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsValArray(oldVal)).chain(
+ return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
@@ -2723,7 +2551,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
syncOp(new SyncInOp(m.size() == 1) {
@Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get();
+ tx.putAllAsync(ctx, m, false, CU.empty0()).get();
}
@Override public String toString() {
@@ -2747,7 +2575,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncInOp(m.keySet()) {
@Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
- return tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).chain(RET2NULL);
+ return tx.putAllAsync(ctx, m, false, CU.empty0()).chain(RET2NULL);
}
@Override public String toString() {
@@ -2769,7 +2597,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
V prevVal = syncOp(new SyncOp<V>(true) {
@Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value();
+ V ret = tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/true,
+ CU.empty0(),
+ /*singleRmv*/false).get().value();
if (ctx.config().getInterceptor() != null)
return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
@@ -2802,8 +2634,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
// TODO should we invoke interceptor here?
- return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0())
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+ return tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/true,
+ CU.empty0(),
+ /*singleRmv*/false).chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@Override public String toString() {
@@ -2849,7 +2684,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
syncOp(new SyncInOp(keys.size() == 1) {
@Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get();
+ tx.removeAllAsync(ctx,
+ keys,
+ /*retval*/false,
+ CU.empty0(),
+ /*singleRmv*/false).get();
}
@Override public String toString() {
@@ -2875,7 +2714,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
- return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL);
+ return tx.removeAllAsync(ctx,
+ keys,
+ /*retval*/false,
+ CU.empty0(),
+ /*singleRmv*/false).chain(RET2NULL);
}
@Override public String toString() {
@@ -2902,7 +2745,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
@Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success();
+ return tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/false,
+ CU.empty0(),
+ /*singleRmv*/true).get().success();
}
@Override public String toString() {
@@ -2940,7 +2787,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
@Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
- return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain(
+ return tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/false,
+ filter,
+ /*singleRmv*/true).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
@@ -2970,9 +2821,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return tx.removeAllAsync(ctx,
Collections.singletonList(key),
- null,
- true,
- ctx.equalsValArray(val)).get();
+ /*retval*/true,
+ ctx.equalsValArray(val),
+ /*singleRmv*/false).get();
}
@Override public String toString() {
@@ -3037,8 +2888,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return tx.putAllAsync(ctx,
F.t(key, newVal),
true,
- null,
- -1,
ctx.equalsValArray(oldVal)).get();
}
@@ -3066,13 +2915,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx,
+ return (IgniteInternalFuture)tx.removeAllAsync(ctx,
Collections.singletonList(key),
- null,
- true,
- ctx.equalsValArray(val));
-
- return fut;
+ /*retval*/true,
+ ctx.equalsValArray(val),
+ /*singleRmv*/false);
}
@Override public String toString() {
@@ -3100,14 +2947,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx,
+ return (IgniteInternalFuture)tx.putAllAsync(ctx,
F.t(key, newVal),
true,
- null,
- -1,
ctx.equalsValArray(oldVal));
-
- return fut;
}
@Override public String toString() {
@@ -3135,8 +2978,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).get().success();
+ return tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/false,
+ ctx.equalsValArray(val),
+ /*singleRmv*/false).get().success();
}
@Override public String toString() {
@@ -3175,8 +3021,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
- return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
- ctx.equalsValArray(val)).chain(
+ return tx.removeAllAsync(ctx,
+ Collections.singletonList(key),
+ /*retval*/false,
+ ctx.equalsValArray(val),
+ /*singleRmv*/false).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
@@ -3200,7 +3049,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
- return tx.removeAllAsync(ctx, keys, null, false, null);
+ return tx.removeAllAsync(ctx,
+ keys,
+ /*retval*/false,
+ null,
+ /*singleRmv*/false);
}
@Override public String toString() {
@@ -4579,7 +4432,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
!ctx.config().isReadFromBackup(),
/*skip tx*/false,
null,
- null,
taskName,
deserializePortable,
false,
@@ -4741,41 +4593,34 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
boolean deserializePortable)
throws IgniteCheckedException, GridCacheEntryRemovedException
{
- try {
- CacheObject val = entry.innerGet(
- null,
- false,
- false,
- false,
- true,
- false,
- false,
- false,
- null,
- null,
- null,
- null);
-
- if (val == null)
- return null;
+ CacheObject val = entry.innerGet(
+ null,
+ false,
+ false,
+ false,
+ true,
+ false,
+ false,
+ false,
+ null,
+ null,
+ null,
+ null);
- KeyCacheObject key = entry.key();
+ if (val == null)
+ return null;
- Object key0 = key.value(ctx.cacheObjectContext(), true);
- Object val0 = val.value(ctx.cacheObjectContext(), true);
+ KeyCacheObject key = entry.key();
- if (deserializePortable) {
- key0 = ctx.unwrapPortableIfNeeded(key0, true);
- val0 = ctx.unwrapPortableIfNeeded(val0, true);
- }
+ Object key0 = key.value(ctx.cacheObjectContext(), true);
+ Object val0 = val.value(ctx.cacheObjectContext(), true);
- return new CacheEntryImpl<>((K)key0, (V)val0, entry.version());
+ if (deserializePortable) {
+ key0 = ctx.unwrapPortableIfNeeded(key0, true);
+ val0 = ctx.unwrapPortableIfNeeded(val0, true);
}
- catch (GridCacheFilterFailedException ignore) {
- assert false;
- return null;
- }
+ return new CacheEntryImpl<>((K)key0, (V)val0, entry.version());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 430590a..50b01c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -286,12 +287,10 @@ public interface GridCacheEntryEx {
* @param subjId Subject ID initiated this read.
* @param transformClo Transform closure to record event.
* @param taskName Task name.
- * together with getting the value is an atomic operation.
* @param expiryPlc Expiry policy.
* @return Cached value.
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
- * @throws GridCacheFilterFailedException If filter failed.
*/
@Nullable public CacheObject innerGet(@Nullable IgniteInternalTx tx,
boolean readSwap,
@@ -305,7 +304,33 @@ public interface GridCacheEntryEx {
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc)
- throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
+ throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+ /**
+ * @param tx Cache transaction.
+ * @param readSwap Flag indicating whether to check swap memory.
+ * @param unmarshal Unmarshal flag.
+ * @param updateMetrics If {@code true} then metrics should be updated.
+ * @param evt Flag to signal event notification.
+ * @param subjId Subject ID initiated this read.
+ * @param transformClo Transform closure to record event.
+ * @param taskName Task name.
+ * @param expiryPlc Expiry policy.
+ * @return Cached value and entry version.
+ * @throws IgniteCheckedException If loading value failed.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ */
+ @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ IgniteInternalTx tx,
+ boolean readSwap,
+ boolean unmarshal,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ Object transformClo,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc)
+ throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
* Reloads entry from underlying storage.
@@ -334,6 +359,7 @@ public interface GridCacheEntryEx {
* @param explicitVer Explicit version (if any).
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
+ * @param dhtVer Dht version for near cache entry.
* @return Tuple containing success flag and old value. If success is {@code false},
* then value is {@code null}.
* @throws IgniteCheckedException If storing value failed.
@@ -355,7 +381,8 @@ public interface GridCacheEntryEx {
long drExpireTime,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable GridCacheVersion dhtVer
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -372,6 +399,7 @@ public interface GridCacheEntryEx {
* @param explicitVer Explicit version (if any).
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
+ * @param dhtVer Dht version for near cache entry.
* @return Tuple containing success flag and old value. If success is {@code false},
* then value is {@code null}.
* @throws IgniteCheckedException If remove failed.
@@ -390,7 +418,8 @@ public interface GridCacheEntryEx {
GridDrType drType,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable GridCacheVersion dhtVer
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -409,6 +438,7 @@ public interface GridCacheEntryEx {
* @param primary If update is performed on primary node (the one which assigns version).
* @param checkVer Whether update should check current version and ignore update if current version is
* greater than passed in.
+ * @param topVer Topology version.
* @param filter Optional filter to check.
* @param drType DR type.
* @param conflictTtl Conflict TTL (if any).
@@ -510,12 +540,17 @@ public interface GridCacheEntryEx {
*
* @param tx Cache transaction.
* @param timeout Timeout for lock acquisition.
+ * @param serOrder Version for serializable transactions ordering.
+ * @param serReadVer Optional read entry version for optimistic serializable transaction.
* @return {@code True} if lock was acquired, {@code false} otherwise.
* @throws GridCacheEntryRemovedException If this entry is obsolete.
* @throws GridDistributedLockCancelledException If lock has been cancelled.
*/
- public boolean tmLock(IgniteInternalTx tx, long timeout) throws GridCacheEntryRemovedException,
- GridDistributedLockCancelledException;
+ public boolean tmLock(IgniteInternalTx tx,
+ long timeout,
+ @Nullable GridCacheVersion serOrder,
+ @Nullable GridCacheVersion serReadVer)
+ throws GridCacheEntryRemovedException, GridDistributedLockCancelledException;
/**
* Unlocks acquired lock.
@@ -566,6 +601,15 @@ public interface GridCacheEntryEx {
public GridCacheVersion version() throws GridCacheEntryRemovedException;
/**
+ * Checks if there was read/write conflict in serializable transaction.
+ *
+ * @param serReadVer Version read in serializable transaction.
+ * @return {@code True} if version check passed.
+ * @throws GridCacheEntryRemovedException If entry has been removed.
+ */
+ public boolean checkSerializableReadVersion(GridCacheVersion serReadVer) throws GridCacheEntryRemovedException;
+
+ /**
* Peeks into entry without loading value or updating statistics.
*
* @param heap Read from heap flag.
@@ -653,11 +697,11 @@ public interface GridCacheEntryEx {
* @param val New value.
* @param curVer Version to match or {@code null} if match is not required.
* @param newVer Version to set.
- * @return {@code True} if versioned matched.
+ * @return Non null version if value was set.
* @throws IgniteCheckedException If index could not be updated.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public boolean versionedValue(CacheObject val,
+ public GridCacheVersion versionedValue(CacheObject val,
@Nullable GridCacheVersion curVer,
@Nullable GridCacheVersion newVer)
throws IgniteCheckedException, GridCacheEntryRemovedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..2111594 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -37,12 +37,14 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
@@ -59,6 +61,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -673,7 +676,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
String taskName,
@Nullable IgniteCacheExpiryPolicy expirePlc)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- return innerGet0(tx,
+ return (CacheObject)innerGet0(tx,
readSwap,
readThrough,
evt,
@@ -683,12 +686,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
subjId,
transformClo,
taskName,
- expirePlc);
+ expirePlc,
+ false);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ IgniteInternalTx tx,
+ boolean readSwap,
+ boolean unmarshal,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ Object transformClo,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc)
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
+ return (T2<CacheObject, GridCacheVersion>)innerGet0(tx,
+ readSwap,
+ false,
+ evt,
+ unmarshal,
+ updateMetrics,
+ false,
+ subjId,
+ transformClo,
+ taskName,
+ expiryPlc,
+ true);
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "RedundantTypeArguments", "TooBroadScope"})
- private CacheObject innerGet0(IgniteInternalTx tx,
+ private Object innerGet0(IgniteInternalTx tx,
boolean readSwap,
boolean readThrough,
boolean evt,
@@ -698,8 +728,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
UUID subjId,
Object transformClo,
String taskName,
- @Nullable IgniteCacheExpiryPolicy expiryPlc)
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean retVer)
throws IgniteCheckedException, GridCacheEntryRemovedException {
+ assert !(retVer && readThrough);
+
// Disable read-through if there is no store.
if (readThrough && !cctx.readThrough())
readThrough = false;
@@ -710,6 +743,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject ret = null;
GridCacheVersion startVer;
+ GridCacheVersion resVer = null;
boolean expired = false;
@@ -840,11 +874,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (ret != null && expiryPlc != null)
updateTtl(expiryPlc);
+
+ if (retVer) {
+ resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : this.ver;
+
+ if (resVer == null)
+ ret = null;
+ }
}
if (ret != null)
// If return value is consistent, then done.
- return ret;
+ return retVer ? new T2<>(ret, resVer) : ret;
boolean loadedFromStore = false;
@@ -906,6 +947,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
+ assert ret == null || !retVer;
+
return ret;
}
@@ -1015,7 +1058,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long drExpireTime,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
- String taskName
+ String taskName,
+ @Nullable GridCacheVersion dhtVer
) throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject old;
@@ -1035,6 +1079,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
checkObsolete();
+ if (isNear()) {
+ assert dhtVer != null;
+
+ // It is possible that 'get' could load more recent value.
+ if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
+ return new GridCacheUpdateTxResult(false, null);
+ }
+
assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
"Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']';
@@ -1169,8 +1221,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridDrType drType,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
- String taskName
- ) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ String taskName,
+ @Nullable GridCacheVersion dhtVer
+ ) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.transactional();
CacheObject old;
@@ -1194,6 +1247,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
checkObsolete();
+ if (isNear()) {
+ assert dhtVer != null;
+
+ // It is possible that 'get' could load more recent value.
+ if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
+ return new GridCacheUpdateTxResult(false, null);
+ }
+
assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
"Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
@@ -2549,6 +2610,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * @return {@code True} if this entry should not be evicted from cache.
+ */
+ protected boolean evictionDisabled() {
+ return false;
+ }
+
+ /**
* <p>
* Note that {@link #onMarkedObsolete()} should always be called after this method
* returns {@code true}.
@@ -2560,6 +2628,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) {
assert Thread.holdsLock(this);
+ if (evictionDisabled()) {
+ assert !obsolete() : this;
+
+ return false;
+ }
+
GridCacheVersion obsoleteVer = obsoleteVersionExtras();
if (ver != null) {
@@ -2790,6 +2864,25 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return ver;
}
+ /** {@inheritDoc} */
+ @Override public synchronized boolean checkSerializableReadVersion(GridCacheVersion serReadVer)
+ throws GridCacheEntryRemovedException {
+ checkObsolete();
+
+ if (!serReadVer.equals(ver)) {
+ boolean empty = isStartVersion() || deletedUnlocked();
+
+ if (serReadVer.equals(IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER))
+ return empty;
+ else if (serReadVer.equals(IgniteTxEntry.SER_READ_NOT_EMPTY_VER))
+ return !empty;
+
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Gets hash value for the entry key.
*
@@ -3115,16 +3208,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public synchronized boolean versionedValue(CacheObject val,
+ @Override public synchronized GridCacheVersion versionedValue(CacheObject val,
GridCacheVersion curVer,
GridCacheVersion newVer)
throws IgniteCheckedException, GridCacheEntryRemovedException {
+
checkObsolete();
if (curVer == null || curVer.equals(ver)) {
if (val != this.val) {
+ GridCacheMvcc mvcc = mvccExtras();
+
+ if (mvcc != null && !mvcc.isEmpty())
+ return null;
+
if (newVer == null)
- newVer = nextVersion();
+ newVer = cctx.versions().next();
CacheObject old = rawGetOrUnmarshalUnlocked(false);
@@ -3144,12 +3243,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Version does not change for load ops.
update(val, expTime, ttl, newVer);
- }
- return true;
+ return newVer;
+ }
}
- return false;
+ return null;
}
/**
@@ -3683,6 +3782,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
if (F.isEmptyOrNulls(filter)) {
synchronized (this) {
+ if (evictionDisabled()) {
+ assert !obsolete();
+
+ return false;
+ }
+
if (obsoleteVersionExtras() != null)
return true;
@@ -3727,6 +3832,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return false;
synchronized (this) {
+ if (evictionDisabled()) {
+ assert !obsolete();
+
+ return false;
+ }
+
if (obsoleteVersionExtras() != null)
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index c2102bd..12583ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
@@ -53,6 +54,32 @@ public final class GridCacheMvcc {
/** Logger. */
private static volatile IgniteLogger log;
+ /** */
+ private static final Comparator<GridCacheVersion> SER_VER_COMPARATOR = new Comparator<GridCacheVersion>() {
+ @Override public int compare(GridCacheVersion ver1, GridCacheVersion ver2) {
+ long time1 = ver1.globalTime();
+ long time2 = ver2.globalTime();
+
+ if (time1 == time2) {
+ int nodeOrder1 = ver1.nodeOrder();
+ int nodeOrder2 = ver2.nodeOrder();
+
+ if (nodeOrder1 == nodeOrder2) {
+ long order1 = ver1.order();
+ long order2 = ver2.order();
+
+ assert order1 != order2;
+
+ return order1 > order2 ? 1 : -1;
+ }
+ else
+ return nodeOrder1 > nodeOrder2 ? 1 : -1;
+ }
+ else
+ return time1 > time2 ? 1 : -1;
+ }
+ };
+
/** Cache context. */
@GridToStringExclude
private final GridCacheContext<?, ?> cctx;
@@ -160,8 +187,9 @@ public final class GridCacheMvcc {
/**
* @param cand Candidate to add.
+ * @return {@code False} if failed to add candidate and transaction should be cancelled.
*/
- private void add0(GridCacheMvccCandidate cand) {
+ private boolean add0(GridCacheMvccCandidate cand) {
assert cand != null;
// Local.
@@ -171,31 +199,59 @@ public final class GridCacheMvcc {
if (!cand.nearLocal()) {
if (!locs.isEmpty()) {
- GridCacheMvccCandidate c = locs.getFirst();
+ if (cand.serializable()) {
+ GridCacheMvccCandidate last = locs.getLast();
+
+ if (!last.serializable())
+ return false;
+
+ GridCacheVersion lastOrder = last.serializableOrder();
+
+ assert lastOrder != null : last;
+
+ GridCacheVersion candOrder = cand.serializableOrder();
- if (c.owner()) {
+ assert candOrder != null : cand;
+
+ int cmp = SER_VER_COMPARATOR.compare(lastOrder, candOrder);
+
+ assert cmp != 0;
+
+ if (cmp > 0)
+ return false;
+
+ locs.addLast(cand);
+
+ return true;
+ }
+
+ GridCacheMvccCandidate first = locs.getFirst();
+
+ if (first.owner()) {
// If reentry, add at the beginning. Note that
// no reentry happens for DHT-local candidates.
- if (!cand.dhtLocal() && c.threadId() == cand.threadId()) {
+ if (!cand.dhtLocal() && first.threadId() == cand.threadId()) {
+ assert !first.serializable();
+
cand.setOwner();
cand.setReady();
cand.setReentry();
locs.addFirst(cand);
- return;
+ return true;
}
}
// Iterate in reverse order.
for (ListIterator<GridCacheMvccCandidate> it = locs.listIterator(locs.size()); it.hasPrevious(); ) {
- c = it.previous();
+ GridCacheMvccCandidate c = it.previous();
assert !c.version().equals(cand.version()) : "Versions can't match [existing=" + c +
", new=" + cand + ']';
- // Add after the owner.
- if (c.owner()) {
+ // Add after the owner or serializable tx.
+ if (c.owner() || c.serializable()) {
// Threads are checked above.
assert cand.dhtLocal() || c.threadId() != cand.threadId();
@@ -204,7 +260,7 @@ public final class GridCacheMvcc {
it.add(cand);
- return;
+ return true;
}
// If not the owner, add after the lesser version.
@@ -214,7 +270,7 @@ public final class GridCacheMvcc {
it.add(cand);
- return;
+ return true;
}
}
}
@@ -228,6 +284,8 @@ public final class GridCacheMvcc {
}
// Remote.
else {
+ assert !cand.serializable() : cand;
+
if (rmts == null)
rmts = new LinkedList<>();
@@ -241,12 +299,14 @@ public final class GridCacheMvcc {
if (cand.owner())
cur.setOwner();
- return;
+ return true;
}
// Either list is empty or candidate is last.
rmts.add(cand);
}
+
+ return true;
}
/**
@@ -456,6 +516,7 @@ public final class GridCacheMvcc {
threadId,
ver,
timeout,
+ /*serializable order*/null,
reenter,
tx,
implicitSingle,
@@ -470,6 +531,7 @@ public final class GridCacheMvcc {
* @param threadId Thread ID.
* @param ver Lock version.
* @param timeout Lock acquisition timeout.
+ * @param serOrder Version for serializable transactions ordering.
* @param reenter Reentry flag ({@code true} if reentry is allowed).
* @param tx Transaction flag.
* @param implicitSingle Implicit flag.
@@ -484,6 +546,7 @@ public final class GridCacheMvcc {
long threadId,
GridCacheVersion ver,
long timeout,
+ @Nullable GridCacheVersion serOrder,
boolean reenter,
boolean tx,
boolean implicitSingle,
@@ -528,12 +591,23 @@ public final class GridCacheMvcc {
tx,
implicitSingle,
/*near-local*/false,
- dhtLoc
+ dhtLoc,
+ serOrder
);
- cctx.mvcc().addLocal(cand);
+ if (serOrder == null) {
+ cctx.mvcc().addLocal(cand);
- add0(cand);
+ boolean add = add0(cand);
+
+ assert add : cand;
+ }
+ else {
+ if (!add0(cand))
+ return null;
+
+ cctx.mvcc().addLocal(cand);
+ }
return cand;
}
@@ -575,7 +649,8 @@ public final class GridCacheMvcc {
tx,
implicitSingle,
nearLoc,
- false
+ false,
+ null
);
addRemote(cand);
@@ -596,11 +671,28 @@ public final class GridCacheMvcc {
* @param implicitSingle Implicit flag.
* @return Add remote candidate.
*/
- public GridCacheMvccCandidate addNearLocal(GridCacheEntryEx parent, UUID nodeId,
- @Nullable UUID otherNodeId, long threadId, GridCacheVersion ver, long timeout, boolean tx,
+ public GridCacheMvccCandidate addNearLocal(GridCacheEntryEx parent,
+ UUID nodeId,
+ @Nullable UUID otherNodeId,
+ long threadId,
+ GridCacheVersion ver,
+ long timeout,
+ boolean tx,
boolean implicitSingle) {
- GridCacheMvccCandidate cand = new GridCacheMvccCandidate(parent, nodeId, otherNodeId, null, threadId, ver,
- timeout, /*local*/true, /*reentry*/false, tx, implicitSingle, /*near loc*/true, /*dht loc*/false);
+ GridCacheMvccCandidate cand = new GridCacheMvccCandidate(parent,
+ nodeId,
+ otherNodeId,
+ null,
+ threadId,
+ ver,
+ timeout,
+ /*local*/true,
+ /*reentry*/false,
+ tx,
+ implicitSingle,
+ /*near loc*/true,
+ /*dht loc*/false,
+ null);
add0(cand);
@@ -855,9 +947,22 @@ public final class GridCacheMvcc {
}
if (locs != null) {
+ boolean first = true;
+
for (ListIterator<GridCacheMvccCandidate> it = locs.listIterator(); it.hasNext(); ) {
GridCacheMvccCandidate cand = it.next();
+ if (first && cand.serializable()) {
+ if (cand.owner() || !cand.ready())
+ return;
+
+ cand.setOwner();
+
+ return;
+ }
+
+ first = false;
+
if (cand.owner())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
index f19a054..aba8318 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
@@ -126,6 +126,9 @@ public class GridCacheMvccCandidate implements Externalizable,
@GridToStringInclude
private transient volatile GridCacheVersion ownerVer;
+ /** */
+ private GridCacheVersion serOrder;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -147,6 +150,7 @@ public class GridCacheMvccCandidate implements Externalizable,
* @param singleImplicit Single-key-implicit-transaction flag.
* @param nearLoc Near-local flag.
* @param dhtLoc DHT local flag.
+ * @param serOrder Version for serializable transactions ordering.
*/
public GridCacheMvccCandidate(
GridCacheEntryEx parent,
@@ -161,7 +165,9 @@ public class GridCacheMvccCandidate implements Externalizable,
boolean tx,
boolean singleImplicit,
boolean nearLoc,
- boolean dhtLoc) {
+ boolean dhtLoc,
+ @Nullable GridCacheVersion serOrder
+ ) {
assert nodeId != null;
assert ver != null;
assert parent != null;
@@ -173,6 +179,7 @@ public class GridCacheMvccCandidate implements Externalizable,
this.threadId = threadId;
this.ver = ver;
this.timeout = timeout;
+ this.serOrder = serOrder;
mask(LOCAL, loc);
mask(REENTRY, reentry);
@@ -244,7 +251,8 @@ public class GridCacheMvccCandidate implements Externalizable,
tx(),
singleImplicit(),
nearLocal(),
- dhtLocal());
+ dhtLocal(),
+ serializableOrder());
reentry.topVer = topVer;
@@ -452,6 +460,20 @@ public class GridCacheMvccCandidate implements Externalizable,
}
/**
+ * @return Serializable transaction flag.
+ */
+ public boolean serializable() {
+ return serOrder != null;
+ }
+
+ /**
+ * @return Version for serializable transactions ordering.
+ */
+ @Nullable public GridCacheVersion serializableOrder() {
+ return serOrder;
+ }
+
+ /**
* @return {@code True} if this candidate is a reentry.
*/
public boolean reentry() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 578ad6c..5bf4ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -139,7 +139,6 @@ import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Cache processor.
@@ -398,15 +397,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IgniteCheckedException("Cannot start cache in PRIVATE or ISOLATED deployment mode: " +
ctx.config().getDeploymentMode());
- if (!c.getTransactionConfiguration().isTxSerializableEnabled() &&
- c.getTransactionConfiguration().getDefaultTxIsolation() == SERIALIZABLE)
- U.warn(log,
- "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
- "(most likely misconfiguration - either update 'isTxSerializableEnabled' or " +
- "'defaultTxIsolationLevel' properties) for cache: " + U.maskName(cc.getName()),
- "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " +
- "for cache: " + U.maskName(cc.getName()));
-
if (cc.isWriteBehindEnabled()) {
if (cfgStore == null)
throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
@@ -619,9 +609,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
ctx.config().getCacheStoreSessionListenerFactories()));
- ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
- !ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
-
for (int i = 0; i < cfgs.length; i++) {
if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName()))
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index d4f0d6c..a138d30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -742,7 +743,10 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
}
/** {@inheritDoc} */
- @Override public boolean tmLock(IgniteInternalTx tx, long timeout)
+ @Override public boolean tmLock(IgniteInternalTx tx,
+ long timeout,
+ @Nullable GridCacheVersion serOrder,
+ GridCacheVersion serReadVer)
throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
if (tx.local())
// Null is returned if timeout is negative and there is other lock owner.
@@ -751,8 +755,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
tx.xidVersion(),
tx.topologyVersion(),
timeout,
- false,
- true,
+ /*reenter*/false,
+ /*tx*/true,
tx.implicitSingle()) != null;
try {
@@ -762,7 +766,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
tx.threadId(),
tx.xidVersion(),
tx.timeout(),
- true,
+ /*tx*/true,
tx.implicitSingle(),
tx.ownedVersion(txKey())
);