You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2019/01/21 12:26:53 UTC
[ignite] branch master updated: IGNITE-10752: MVCC TX: Fix MVCC
invariants violations. This closes #5809.
This is an automated email from the ASF dual-hosted git repository.
gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new afeee51 IGNITE-10752: MVCC TX: Fix MVCC invariants violations. This closes #5809.
afeee51 is described below
commit afeee5120c8b53550bdf6455c6ebe576c52864c8
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Mon Jan 21 15:26:41 2019 +0300
IGNITE-10752: MVCC TX: Fix MVCC invariants violations. This closes #5809.
---
.../communication/GridIoMessageFactory.java | 12 +-
.../managers/discovery/GridDiscoveryManager.java | 4 +-
.../processors/cache/GridCacheAdapter.java | 10 +-
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheEntryEx.java | 13 +-
.../processors/cache/GridCacheMapEntry.java | 116 ++--
.../processors/cache/GridCacheUpdateTxResult.java | 26 -
.../internal/processors/cache/GridCacheUtils.java | 15 +-
.../cache/IgniteCacheOffheapManager.java | 81 ---
.../cache/IgniteCacheOffheapManagerImpl.java | 191 -------
.../GridDistributedTxRemoteAdapter.java | 12 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 17 +-
.../dht/GridDhtTxAbstractEnlistFuture.java | 51 +-
.../distributed/dht/GridDhtTxEnlistFuture.java | 1 -
.../distributed/dht/GridDhtTxFinishFuture.java | 41 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 6 +-
.../dht/GridDhtTxQueryAbstractEnlistFuture.java | 8 +-
.../dht/GridDhtTxQueryEnlistFuture.java | 53 +-
.../dht/GridDhtTxQueryResultsEnlistFuture.java | 1 -
.../distributed/dht/GridPartitionedGetFuture.java | 58 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../dht/NearTxQueryEnlistResultHandler.java | 31 +-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 13 +-
.../dht/colocated/GridDhtColocatedCache.java | 74 +--
.../preloader/GridDhtPartitionsExchangeFuture.java | 2 +-
.../distributed/near/AckCoordinatorOnRollback.java | 54 --
.../cache/distributed/near/GridNearGetFuture.java | 15 +-
.../near/GridNearPessimisticTxPrepareFuture.java | 2 +-
.../near/GridNearTxAbstractEnlistFuture.java | 3 +-
.../distributed/near/GridNearTxEnlistFuture.java | 2 -
.../near/GridNearTxFinishAndAckFuture.java | 109 +---
.../distributed/near/GridNearTxFinishFuture.java | 24 +-
.../cache/distributed/near/GridNearTxLocal.java | 100 +---
.../distributed/near/TxTopologyVersionFuture.java | 6 +
.../cache/local/atomic/GridLocalAtomicCache.java | 13 +-
.../processors/cache/mvcc/MvccCoordinator.java | 92 ++-
.../processors/cache/mvcc/MvccDiscoveryData.java | 52 --
.../cache/mvcc/MvccPreviousCoordinatorQueries.java | 6 +
.../processors/cache/mvcc/MvccProcessor.java | 87 +--
.../processors/cache/mvcc/MvccProcessorImpl.java | 630 +++++++--------------
.../processors/cache/mvcc/MvccQueryTracker.java | 28 -
.../cache/mvcc/MvccQueryTrackerImpl.java | 326 +++++------
.../internal/processors/cache/mvcc/MvccUtils.java | 67 +--
.../cache/mvcc/StaticMvccQueryTracker.java | 17 -
.../cache/mvcc/msg/MvccWaitTxsRequest.java | 159 ------
.../GridCacheDatabaseSharedManager.java | 3 -
.../cache/persistence/GridCacheOffheapManager.java | 14 -
.../cache/transactions/IgniteTxAdapter.java | 33 +-
.../cache/transactions/IgniteTxHandler.java | 3 +-
.../cache/transactions/IgniteTxLocalAdapter.java | 69 +--
.../cache/transactions/IgniteTxManager.java | 145 +++--
.../tree/mvcc/data/MvccUpdateDataRowNative.java | 240 --------
.../processors/cache/GridCacheTestEntryEx.java | 13 +-
.../processors/cache/IgniteCacheGroupsTest.java | 1 -
.../CacheMvccAbstractCoordinatorFailoverTest.java | 4 -
...acheMvccPartitionedCoordinatorFailoverTest.java | 7 -
.../cache/mvcc/CacheMvccTransactionsTest.java | 6 +-
.../DataStreamProcessorMvccSelfTest.java | 1 -
...ocalWalModeChangeDuringRebalancingSelfTest.java | 4 +-
.../query/h2/DhtResultSetEnlistFuture.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 17 +-
...acheMvccAbstractSqlCoordinatorFailoverTest.java | 2 -
...acheMvccContinuousQueryClientReconnectTest.java | 3 -
...eMvccPartitionedSqlCoordinatorFailoverTest.java | 6 -
.../GridIndexRebuildWithMvccEnabledSelfTest.java | 3 +-
65 files changed, 873 insertions(+), 2337 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5e7811b..be467f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -34,6 +34,8 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
+import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
+import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
@@ -46,8 +48,6 @@ import org.apache.ignite.internal.processors.cache.CacheEvictionEntry;
import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
-import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -137,7 +137,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
@@ -184,8 +183,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
-import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult;
import org.apache.ignite.internal.processors.service.ServiceDeploymentProcessId;
+import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult;
import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.GridIntList;
@@ -998,11 +997,6 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- case 142:
- msg = new MvccWaitTxsRequest();
-
- break;
-
case 143:
msg = new GridCacheMvccEntryInfo();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6c72258..372372c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -787,7 +787,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoWrk.discoCache = discoCache;
if (!isLocDaemon && !ctx.clientDisconnected()) {
- ctx.cache().context().coordinators().onLocalJoin(discoEvt);
+ ctx.cache().context().coordinators().onLocalJoin(discoEvt, discoCache);
ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache);
@@ -849,6 +849,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
+ ctx.cache().context().coordinators().onLocalJoin(localJoinEvent(), discoCache);
+
ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache);
ctx.service().onLocalJoin(localJoinEvent(), discoCache);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e5a41bb..a3e1707 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -49,7 +49,6 @@ import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCacheRestartingException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -1978,8 +1977,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
tx = checkCurrentTx();
}
- if (tx == null || tx.implicit()) {
- assert !ctx.mvccEnabled() || mvccSnapshot != null;
+ if (ctx.mvccEnabled() || tx == null || tx.implicit()) {
+ assert (mvccSnapshot == null) == !ctx.mvccEnabled();
Map<KeyCacheObject, EntryGetResult> misses = null;
@@ -2085,7 +2084,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
expiry,
!deserializeBinary,
- mvccSnapshot,
readerArgs);
assert res != null;
@@ -2110,7 +2108,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
taskName,
expiry,
!deserializeBinary,
- mvccSnapshot,
readerArgs);
if (res == null)
@@ -4957,8 +4954,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/*transformClo*/null,
/*taskName*/null,
/*expiryPlc*/null,
- !deserializeBinary,
- null); // TODO IGNITE-7371
+ !deserializeBinary);
if (val == null)
return null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2f73218..2bdb275 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2140,7 +2140,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return {@code True} if it is possible to directly read offheap instead of using {@link GridCacheEntryEx#innerGet}.
*/
public boolean readNoEntry(@Nullable IgniteCacheExpiryPolicy expiryPlc, boolean readers) {
- return !config().isOnheapCacheEnabled() && !readers && expiryPlc == null;
+ return mvccEnabled() || (!config().isOnheapCacheEnabled() && !readers && expiryPlc == null);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 26da38b..36b524f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -272,7 +272,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public CacheObject innerGet(@Nullable GridCacheVersion ver,
+ public CacheObject innerGet(@Nullable GridCacheVersion ver,
@Nullable IgniteInternalTx tx,
boolean readThrough,
boolean updateMetrics,
@@ -281,8 +281,7 @@ public interface GridCacheEntryEx {
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary,
- @Nullable MvccSnapshot mvccVer)
+ boolean keepBinary)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -310,7 +309,6 @@ public interface GridCacheEntryEx {
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- @Nullable MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException;
@@ -332,7 +330,6 @@ public interface GridCacheEntryEx {
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- @Nullable MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -467,8 +464,7 @@ public interface GridCacheEntryEx {
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- @Nullable MvccSnapshot mvccVer
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
@@ -510,8 +506,7 @@ public interface GridCacheEntryEx {
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- MvccSnapshot mvccVer
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6a728e0..d659aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -622,7 +621,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Nullable @Override public final CacheObject innerGet(
+ @Override public final CacheObject innerGet(
@Nullable GridCacheVersion ver,
@Nullable IgniteInternalTx tx,
boolean readThrough,
@@ -632,8 +631,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expirePlc,
- boolean keepBinary,
- MvccSnapshot mvccVer)
+ boolean keepBinary)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return (CacheObject)innerGet0(
ver,
@@ -648,7 +646,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
false,
keepBinary,
false,
- mvccVer,
null);
}
@@ -659,7 +656,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0(
/*ver*/null,
@@ -674,7 +670,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
true,
keepBinary,
/*reserve*/true,
- mvccVer,
readerArgs);
}
@@ -689,7 +684,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return (EntryGetResult)innerGet0(
@@ -705,7 +699,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
true,
keepBinary,
false,
- mvccVer,
readerArgs);
}
@@ -724,7 +717,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean retVer,
boolean keepBinary,
boolean reserveForLoad,
- @Nullable MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert !(retVer && readThrough);
@@ -750,51 +742,39 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val;
- if (mvccVer != null) {
- CacheDataRow row = cctx.offheap().mvccRead(cctx, key, mvccVer);
+ boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
- if (row != null) {
- val = row.value();
- resVer = row.version();
- }
- else
- val = null;
- }
- else {
- boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
-
- if (valid) {
- val = this.val;
+ if (valid) {
+ val = this.val;
- if (val == null) {
- if (isStartVersion()) {
- unswap(null, false);
+ if (val == null) {
+ if (isStartVersion()) {
+ unswap(null, false);
- val = this.val;
- }
+ val = this.val;
}
+ }
- if (val != null) {
- long expireTime = expireTimeExtras();
+ if (val != null) {
+ long expireTime = expireTimeExtras();
- if (expireTime > 0 && (expireTime < U.currentTimeMillis())) {
- if (onExpired((CacheObject)cctx.unwrapTemporary(val), null)) {
- val = null;
- evt = false;
+ if (expireTime > 0 && (expireTime < U.currentTimeMillis())) {
+ if (onExpired((CacheObject)cctx.unwrapTemporary(val), null)) {
+ val = null;
+ evt = false;
- if (cctx.deferredDelete()) {
- deferred = true;
- ver0 = ver;
- }
- else
- obsolete = true;
+ if (cctx.deferredDelete()) {
+ deferred = true;
+ ver0 = ver;
}
+ else
+ obsolete = true;
}
}
}
- else
- val = null;
}
+ else
+ val = null;
CacheObject ret = val;
@@ -920,10 +900,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long expTime = CU.toExpireTime(ttl);
// Update indexes before actual write to entry.
- if (cctx.mvccEnabled())
- cctx.offheap().mvccInitialValue(this, ret, nextVer, expTime);
- else
- storeValue(ret, expTime, nextVer);
+ storeValue(ret, expTime, nextVer);
update(ret, expTime, ttl, nextVer, true);
@@ -1034,10 +1011,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Update indexes.
if (ret != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccInitialValue(this, ret, nextVer, expTime);
- else
- storeValue(ret, expTime, nextVer);
+ storeValue(ret, expTime, nextVer);
if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked())
deletedUnlocked(false);
@@ -1460,8 +1434,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- @Nullable MvccSnapshot mvccVer
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject old;
@@ -1483,8 +1456,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ensureFreeSpace();
- GridLongList mvccWaitTxs = null;
-
lockListenerReadLock();
lockEntry();
@@ -1566,18 +1537,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert val != null;
- if (cctx.mvccEnabled()) {
- assert mvccVer != null;
-
- mvccWaitTxs = cctx.offheap().mvccUpdateNative(tx.local(),
- this,
- val,
- newVer,
- expireTime,
- mvccVer);
- }
- else
- storeValue(val, expireTime, newVer);
+ storeValue(val, expireTime, newVer);
if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
deletedUnlocked(false);
@@ -1656,7 +1616,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (intercept)
cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0));
- return valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr, mvccWaitTxs) :
+ return valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr) :
new GridCacheUpdateTxResult(false, logPtr);
}
@@ -1686,8 +1646,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- @Nullable MvccSnapshot mvccVer
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.transactional();
@@ -1717,8 +1676,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean marked = false;
- GridLongList mvccWaitTxs = null;
-
lockListenerReadLock();
lockEntry();
@@ -1766,13 +1723,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
- if (cctx.mvccEnabled()) {
- assert mvccVer != null;
-
- mvccWaitTxs = cctx.offheap().mvccRemoveNative(tx.local(), this, mvccVer);
- }
- else
- removeValue();
+ removeValue();
update(null, 0, 0, newVer, true);
@@ -1893,7 +1844,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
cctx.config().getInterceptor().onAfterRemove(entry0);
if (valid)
- return new GridCacheUpdateTxResult(true, updateCntr0, logPtr, mvccWaitTxs);
+ return new GridCacheUpdateTxResult(true, updateCntr0, logPtr);
else
return new GridCacheUpdateTxResult(false, logPtr);
}
@@ -3675,10 +3626,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
if (val != null) {
- if (cctx.mvccEnabled())
- cctx.offheap().mvccInitialValue(this, val, newVer, expTime);
- else
- storeValue(val, expTime, newVer);
+ storeValue(val, expTime, newVer);
if (deletedUnlocked())
deletedUnlocked(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index 8a68100..7df27d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -37,9 +36,6 @@ public class GridCacheUpdateTxResult {
private long updateCntr;
/** */
- private GridLongList mvccWaitTxs;
-
- /** */
private GridFutureAdapter<GridCacheUpdateTxResult> fut;
/** */
@@ -108,21 +104,6 @@ public class GridCacheUpdateTxResult {
}
/**
- * Constructor.
- *
- * @param success Success flag.
- * @param updateCntr Update counter.
- * @param logPtr Logger WAL pointer for the update.
- * @param mvccWaitTxs List of transactions to wait for completion.
- */
- GridCacheUpdateTxResult(boolean success, long updateCntr, WALPointer logPtr, GridLongList mvccWaitTxs) {
- this.success = success;
- this.updateCntr = updateCntr;
- this.logPtr = logPtr;
- this.mvccWaitTxs = mvccWaitTxs;
- }
-
- /**
* @return Partition update counter.
*/
public long updateCounter() {
@@ -151,13 +132,6 @@ public class GridCacheUpdateTxResult {
}
/**
- * @return List of transactions to wait for completion.
- */
- @Nullable public GridLongList mvccWaitTransactions() {
- return mvccWaitTxs;
- }
-
- /**
* @return Mvcc history rows.
*/
@Nullable public List<MvccLinkAwareSearchRow> mvccHistory() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 0cca255..8d8c0e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -58,7 +58,6 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -68,7 +67,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -103,6 +101,7 @@ import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -330,18 +329,6 @@ public class GridCacheUtils {
}
};
- /** Query mapped filter. */
- public static final IgnitePredicate<GridDistributedTxMapping> FILTER_QUERY_MAPPING = new P1<GridDistributedTxMapping>() {
-
- @Override public boolean apply(GridDistributedTxMapping m) {
- return m.queryUpdate();
- }
-
- @Override public String toString() {
- return "FILTER_QUERY_MAPPING";
- }
- };
-
/** Transaction entry to key. */
private static final IgniteClosure tx2key = new C1<IgniteTxEntry, Object>() {
@Override public Object apply(IgniteTxEntry e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 7f0fc30..d307772 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -226,23 +226,6 @@ public interface IgniteCacheOffheapManager {
* @param val Value.
* @param ver Version.
* @param expireTime Expire time.
- * @return {@code True} if value was inserted.
- * @throws IgniteCheckedException If failed.
- */
- default boolean mvccInitialValue(
- GridCacheMapEntry entry,
- @Nullable CacheObject val,
- GridCacheVersion ver,
- long expireTime
- ) throws IgniteCheckedException {
- return mvccInitialValue(entry, val, ver, expireTime, null, null);
- }
-
- /**
- * @param entry Entry.
- * @param val Value.
- * @param ver Version.
- * @param expireTime Expire time.
* @param mvccVer MVCC version.
* @param newMvccVer New MVCC version.
* @return {@code True} if value was inserted.
@@ -364,37 +347,6 @@ public interface IgniteCacheOffheapManager {
) throws IgniteCheckedException;
/**
- * @param primary {@code True} if on primary node.
- * @param entry Entry.
- * @param val Value.
- * @param ver Cache version.
- * @param expireTime Expire time.
- * @param mvccSnapshot MVCC snapshot.
- * @return Transactions to wait for before finishing current transaction.
- * @throws IgniteCheckedException If failed.
- */
- GridLongList mvccUpdateNative(
- boolean primary,
- GridCacheMapEntry entry,
- CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
-
- /**
- * @param primary {@code True} if on primary node.
- * @param entry Entry.
- * @param mvccSnapshot MVCC snapshot.
- * @return Transactions to wait for before finishing current transaction.
- * @throws IgniteCheckedException If failed.
- */
- GridLongList mvccRemoveNative(
- boolean primary,
- GridCacheMapEntry entry,
- MvccSnapshot mvccSnapshot
- ) throws IgniteCheckedException;
-
- /**
* @param entry Entry.
* @throws IgniteCheckedException If failed.
*/
@@ -914,39 +866,6 @@ public interface IgniteCacheOffheapManager {
/**
* @param cctx Cache context.
- * @param primary {@code True} if update is executed on primary node.
- * @param key Key.
- * @param val Value.
- * @param ver Version.
- * @param expireTime Expire time.
- * @param mvccSnapshot MVCC snapshot.
- * @return Update result.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable GridLongList mvccUpdateNative(
- GridCacheContext cctx,
- boolean primary,
- KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
- * @param primary {@code True} if update is executed on primary node.
- * @param key Key.
- * @param mvccSnapshot MVCC snapshot.
- * @return List of transactions to wait for.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable GridLongList mvccRemoveNative(GridCacheContext cctx,
- boolean primary,
- KeyCacheObject key,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
-
- /**
- * @param cctx Cache context.
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0976f63..9e7b3ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -78,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.tree.RowLinkIO;
import org.apache.ignite.internal.processors.cache.tree.SearchRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRowNative;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccFirstRowTreeClosure;
@@ -582,41 +581,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdateNative(
- boolean primary,
- GridCacheMapEntry entry,
- CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- if (entry.detached() || entry.isNear())
- return null;
-
- return dataStore(entry.localPartition()).mvccUpdateNative(entry.context(),
- primary,
- entry.key(),
- val,
- ver,
- expireTime,
- mvccSnapshot);
- }
-
- /** {@inheritDoc} */
- @Override public GridLongList mvccRemoveNative(
- boolean primary,
- GridCacheMapEntry entry,
- MvccSnapshot mvccSnapshot
- ) throws IgniteCheckedException {
- if (entry.detached() || entry.isNear())
- return null;
-
- return dataStore(entry.localPartition()).mvccRemoveNative(entry.context(),
- primary,
- entry.key(),
- mvccSnapshot);
- }
-
- /** {@inheritDoc} */
@Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException {
if (entry.detached() || entry.isNear())
return;
@@ -2246,161 +2210,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdateNative(
- GridCacheContext cctx,
- boolean primary,
- KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- assert mvccSnapshot != null;
- assert primary || mvccSnapshot.activeTransactions().size() == 0 : mvccSnapshot;
-
- if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
-
- try {
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
- CacheObjectContext coCtx = cctx.cacheObjectContext();
-
- // Make sure value bytes initialized.
- key.valueBytes(coCtx);
- val.valueBytes(coCtx);
-
- MvccUpdateDataRowNative updateRow = new MvccUpdateDataRowNative(
- key,
- val,
- ver,
- expireTime,
- mvccSnapshot,
- null,
- partId,
- cctx);
-
- assert cctx.shared().database().checkpointLockIsHeldByThread();
-
- dataTree.iterate(new MvccMaxSearchRow(cacheId, key), new MvccMinSearchRow(cacheId, key), updateRow);
-
- ResultType res = updateRow.resultType();
-
- if (res == ResultType.VERSION_FOUND) {
- // Do nothing, except cleaning up not needed versions
- cleanup(cctx, updateRow.cleanupRows());
-
- return null;
- }
-
- CacheDataRow oldRow = null;
-
- if (res == ResultType.PREV_NOT_NULL) {
- oldRow = updateRow.oldRow();
-
- assert oldRow != null && oldRow.link() != 0 : oldRow;
-
- oldRow.key(key);
-
- rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
- }
- else
- assert res == ResultType.PREV_NULL;
-
- if (!grp.storeCacheIdInDataPage() && updateRow.cacheId() != CU.UNDEFINED_CACHE_ID) {
- updateRow.cacheId(CU.UNDEFINED_CACHE_ID);
-
- rowStore.addRow(updateRow, grp.statisticsHolderData());
-
- updateRow.cacheId(cctx.cacheId());
- }
- else
- rowStore.addRow(updateRow, grp.statisticsHolderData());
-
- boolean old = dataTree.putx(updateRow);
-
- assert !old;
-
- incrementSize(cctx.cacheId());
-
- GridCacheQueryManager qryMgr = cctx.queries();
-
- if (qryMgr.enabled())
- qryMgr.store(updateRow, null, true);
-
- updatePendingEntries(cctx, updateRow, oldRow);
-
- cleanup(cctx, updateRow.cleanupRows());
-
- return updateRow.activeTransactions();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public GridLongList mvccRemoveNative(GridCacheContext cctx,
- boolean primary,
- KeyCacheObject key,
- MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- assert mvccSnapshot != null;
- assert primary || mvccSnapshot.activeTransactions().size() == 0 : mvccSnapshot;
-
- if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
-
- try {
- int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
- CacheObjectContext coCtx = cctx.cacheObjectContext();
-
- // Make sure value bytes initialized.
- key.valueBytes(coCtx);
-
- MvccUpdateDataRowNative updateRow = new MvccUpdateDataRowNative(
- key,
- null,
- null,
- 0,
- mvccSnapshot,
- null,
- partId,
- cctx);
-
- assert cctx.shared().database().checkpointLockIsHeldByThread();
-
- dataTree.iterate(new MvccMaxSearchRow(cacheId, key) , new MvccMinSearchRow(cacheId, key), updateRow);
-
- ResultType res = updateRow.resultType();
-
- if (res == ResultType.VERSION_FOUND) {
- assert !primary : updateRow;
-
- // Do nothing, except cleaning up not needed versions
- cleanup(cctx, updateRow.cleanupRows());
-
- return null;
- }
- else if (res == ResultType.PREV_NOT_NULL) {
- CacheDataRow oldRow = updateRow.oldRow();
-
- assert oldRow != null && oldRow.link() != 0 : oldRow;
-
- rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
-
- clearPendingEntries(cctx, oldRow);
- }
-
- cleanup(cctx, updateRow.cleanupRows());
-
- return updateRow.activeTransactions();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
@Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c4255fb..3e68306 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -625,8 +625,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
+ txEntry.updateCounter());
else {
assert val != null : txEntry;
@@ -650,8 +649,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
+ txEntry.updateCounter());
txEntry.updateCounter(updRes.updateCounter());
@@ -688,8 +686,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- txEntry.updateCounter(),
- mvccSnapshot());
+ txEntry.updateCounter());
txEntry.updateCounter(updRes.updateCounter());
@@ -816,8 +813,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cctx.tm().commitTx(this);
- cctx.tm().mvccFinish(this, true);
-
state(COMMITTED);
}
}
@@ -926,7 +921,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cctx.mvccCaching().onTxFinished(this, false);
- cctx.tm().mvccFinish(this, false);
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 97ef70d..7004c85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -726,7 +726,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.threadId(),
req.txTimeout(),
req.subjectId(),
- req.taskNameHash());
+ req.taskNameHash(),
+ req.mvccSnapshot());
}
catch (IgniteCheckedException | IgniteException ex) {
GridNearTxQueryEnlistResponse res = new GridNearTxQueryEnlistResponse(req.cacheId(),
@@ -1437,8 +1438,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
null,
tx != null ? tx.resolveTaskName() : null,
null,
- req.keepBinary(),
- null); // TODO IGNITE-7371
+ req.keepBinary());
}
assert e.lockedBy(mappedVer) || ctx.mvcc().isRemoved(e.context(), mappedVer) :
@@ -1959,7 +1959,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.threadId(),
req.txTimeout(),
req.subjectId(),
- req.taskNameHash());
+ req.taskNameHash(),
+ req.mvccSnapshot());
}
catch (Throwable e) {
GridNearTxQueryResultsEnlistResponse res = new GridNearTxQueryResultsEnlistResponse(req.cacheId(),
@@ -2023,7 +2024,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.threadId(),
req.txTimeout(),
req.subjectId(),
- req.taskNameHash());
+ req.taskNameHash(),
+ req.mvccSnapshot());
}
catch (IgniteCheckedException | IgniteException ex) {
GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(),
@@ -2077,6 +2079,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param timeout Timeout.
* @param txSubjectId Transaction subject id.
* @param txTaskNameHash Transaction task name hash.
+ * @param snapshot Mvcc snapsht.
* @return Transaction.
*/
public GridDhtTxLocal initTxTopologyVersion(UUID nodeId,
@@ -2089,7 +2092,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
long nearThreadId,
long timeout,
UUID txSubjectId,
- int txTaskNameHash) throws IgniteException, IgniteCheckedException {
+ int txTaskNameHash,
+ MvccSnapshot snapshot) throws IgniteException, IgniteCheckedException {
assert ctx.affinityNode();
@@ -2184,6 +2188,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
throw new IgniteCheckedException(msg);
}
+ tx.mvccSnapshot(snapshot);
tx.topologyVersion(topVer);
}
finally {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 7b9c29c..25242c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -47,11 +47,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
@@ -122,9 +121,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
/** Future ID. */
protected final int nearMiniId;
- /** Partitions. */
- protected final int[] parts;
-
/** Transaction. */
protected final GridDhtTxLocalAdapter tx;
@@ -197,7 +193,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
* @param threadId Thread ID.
* @param nearFutId Near future id.
* @param nearMiniId Near mini future id.
- * @param parts Partitions.
* @param tx Transaction.
* @param timeout Lock acquisition timeout.
* @param cctx Cache context.
@@ -209,7 +204,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
long threadId,
IgniteUuid nearFutId,
int nearMiniId,
- @Nullable int[] parts,
GridDhtTxLocalAdapter tx,
long timeout,
GridCacheContext<?, ?> cctx,
@@ -229,7 +223,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
this.mvccSnapshot = mvccSnapshot;
this.timeout = timeout;
this.tx = tx;
- this.parts = parts;
this.filter = filter;
lockVer = tx.xidVersion();
@@ -331,7 +324,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
cctx.time().addTimeoutObject(timeoutObj);
try {
- checkPartitions(parts);
+ checkCoordinatorVersion();
UpdateSourceIterator<?> it = createIterator();
@@ -348,6 +341,8 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
else // Nothing to do for single update.
assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1;
+ tx.markQueryEnlisted();
+
this.it = it;
}
catch (Throwable e) {
@@ -431,8 +426,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
assert entryProc != null || !op.isInvoke();
- tx.markQueryEnlisted(mvccSnapshot);
-
boolean needOldVal = cctx.shared().mvccCaching().continuousQueryListeners(cctx, tx, key) != null;
GridCacheUpdateTxResult res;
@@ -920,38 +913,18 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
}
/**
- * Checks whether all the necessary partitions are in {@link GridDhtPartitionState#OWNING} state.
+ * Checks whether new coordinator was initialized after the snapshot is acquired.
+ *
+ * Need to fit invariant that all updates are finished before a new coordinator is initialized.
*
- * @param parts Partitions.
* @throws ClusterTopologyCheckedException If failed.
*/
- @SuppressWarnings("ForLoopReplaceableByForEach")
- private void checkPartitions(@Nullable int[] parts) throws ClusterTopologyCheckedException {
- if (cctx.isLocal() || !cctx.rebalanceEnabled())
- return;
-
- if (parts == null)
- parts = U.toIntArray(
- cctx.affinity()
- .primaryPartitions(cctx.localNodeId(), tx.topologyVersionSnapshot()));
-
- GridDhtPartitionTopology top = cctx.topology();
+ private void checkCoordinatorVersion() throws ClusterTopologyCheckedException {
+ MvccCoordinator crd = cctx.shared().coordinators().currentCoordinator();
- try {
- top.readLock();
-
- for (int i = 0; i < parts.length; i++) {
- GridDhtLocalPartition p = top.localPartition(parts[i]);
-
- if (p == null || p.state() != GridDhtPartitionState.OWNING) {
- throw new ClusterTopologyCheckedException("Cannot run update query. " +
- "Node must own all the necessary partitions.");
- }
- }
- }
- finally {
- top.readUnlock();
- }
+ if (!crd.initialized() || crd.version() != mvccSnapshot.coordinatorVersion())
+ throw new ClusterTopologyCheckedException("Cannot perform update, coordinator was changed: " +
+ "[currentCoordinator=" + crd + ", mvccSnapshot=" + mvccSnapshot + "].");
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
index 936fe6a..5592bd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java
@@ -87,7 +87,6 @@ public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<G
threadId,
nearFutId,
nearMiniId,
- null,
tx,
timeout,
cctx,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 6c83700..b00ad56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,13 +35,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -245,9 +243,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
if (commit && e == null)
e = this.tx.commitError();
- Throwable finishErr = mvccFinish(e != null ? e : err);
+ Throwable finishErr = e != null ? e : err;
if (super.onDone(tx, finishErr)) {
+ cctx.tm().mvccFinish(this.tx);
+
if (finishErr == null)
finishErr = this.tx.commitError();
@@ -288,7 +288,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
public void finish(boolean commit) {
boolean sync;
- assert !tx.queryEnlisted() || tx.mvccSnapshot() != null;
+ assert !tx.txState().mvccEnabled() || tx.mvccSnapshot() != null;
if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap))
sync = finish(commit, dhtMap, nearMap);
@@ -298,22 +298,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
// No backup or near nodes to send commit message to (just complete then).
sync = false;
- GridLongList waitTxs = tx.mvccWaitTransactions();
-
- if (waitTxs != null) {
- MvccSnapshot snapshot = tx.mvccSnapshot();
-
- assert snapshot != null;
-
- MvccCoordinator crd = cctx.coordinators().currentCoordinator();
-
- if (crd != null && crd.coordinatorVersion() == snapshot.coordinatorVersion()) {
- add((IgniteInternalFuture)cctx.coordinators().waitTxsFuture(crd.nodeId(), waitTxs));
-
- sync = true;
- }
- }
-
markInitialized();
if (!sync)
@@ -595,23 +579,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
return res;
}
- /**
- * Finishes MVCC transaction on the local node.
- */
- private Throwable mvccFinish(Throwable commitError) {
- try {
- cctx.tm().mvccFinish(tx, commit && commitError == null);
- }
- catch (IgniteCheckedException ex) {
- if (commitError == null)
- tx.commitError(commitError = ex);
- else
- commitError.addSuppressed(ex);
- }
-
- return commitError;
- }
-
/** {@inheritDoc} */
@Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) {
if (!isDone()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2e46d2b..3453380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -410,8 +410,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
entryProc,
tx.resolveTaskName(),
null,
- keepBinary,
- null); // TODO IGNITE-7371
+ keepBinary);
if (retVal || txEntry.op() == TRANSFORM) {
if (!F.isEmpty(txEntry.entryProcessors())) {
@@ -524,8 +523,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
/*transformClo*/null,
/*taskName*/null,
/*expiryPlc*/null,
- /*keepBinary*/true,
- null); // TODO IGNITE-7371
+ /*keepBinary*/true);
}
if (oldVal != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
index 0a26d75..d6bb8ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
/**
* Abstract future processing transaction enlisting and locking of entries produced with DML and SELECT FOR UPDATE
@@ -36,14 +35,12 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra
/**
* Constructor.
- *
* @param nearNodeId Near node ID.
* @param nearLockVer Near lock version.
* @param mvccSnapshot Mvcc snapshot.
* @param threadId Thread ID.
* @param nearFutId Near future id.
* @param nearMiniId Near mini future id.
- * @param parts Partitions.
* @param tx Transaction.
* @param timeout Lock acquisition timeout.
* @param cctx Cache context.
@@ -54,7 +51,6 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra
long threadId,
IgniteUuid nearFutId,
int nearMiniId,
- @Nullable int[] parts,
GridDhtTxLocalAdapter tx,
long timeout,
GridCacheContext<?, ?> cctx) {
@@ -64,10 +60,10 @@ public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstra
threadId,
nearFutId,
nearMiniId,
- null,
tx,
timeout,
- cctx, null);
+ cctx,
+ null);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
index ed792f0..cea50d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
@@ -19,12 +19,17 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -40,6 +45,9 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli
/** Query string. */
private final String qry;
+ /** Query partitions. */
+ private final int[] parts;
+
/** Query parameters. */
private final Object[] params;
@@ -90,7 +98,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli
threadId,
nearFutId,
nearMiniId,
- parts,
tx,
timeout,
cctx);
@@ -106,14 +113,58 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli
this.params = params;
this.flags = flags;
this.pageSize = pageSize;
+
+ this.parts = calculatePartitions(tx, parts, cctx);
}
/** {@inheritDoc} */
@Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
+ checkPartitions(parts);
+
return cctx.kernalContext().query().prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry,
params, flags, pageSize, 0, tx.topologyVersionSnapshot(), mvccSnapshot, new GridQueryCancel());
}
+ /**
+ * Checks whether all the necessary partitions are in {@link GridDhtPartitionState#OWNING} state.
+ *
+ * @param parts Partitions.
+ * @throws ClusterTopologyCheckedException If failed.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private void checkPartitions(int[] parts) throws ClusterTopologyCheckedException {
+ if (cctx.isLocal() || !cctx.rebalanceEnabled())
+ return;
+
+ GridDhtPartitionTopology top = cctx.topology();
+
+ try {
+ top.readLock();
+
+ for (int i = 0; i < parts.length; i++) {
+ GridDhtLocalPartition p = top.localPartition(parts[i]);
+
+ if (p == null || p.state() != GridDhtPartitionState.OWNING) {
+ throw new ClusterTopologyCheckedException("Cannot run update query. " +
+ "Node must own all the necessary partitions.");
+ }
+ }
+ }
+ finally {
+ top.readUnlock();
+ }
+ }
+
+ /** */
+ private int[] calculatePartitions(GridDhtTxLocalAdapter tx, int[] parts, GridCacheContext<?, ?> cctx) {
+ if (parts == null)
+ parts = U.toIntArray(
+ cctx.affinity()
+ .primaryPartitions(cctx.localNodeId(), tx.topologyVersionSnapshot()));
+
+ return parts;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxQueryEnlistFuture.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
index c6140fb..ec0147e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java
@@ -70,7 +70,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstr
threadId,
nearFutId,
nearMiniId,
- null,
tx,
timeout,
cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 883ecd9..39984b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -42,10 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
@@ -60,8 +57,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Colocated get future.
*/
-public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V>
- implements MvccSnapshotResponseListener {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
/** Transaction label. */
protected final String txLbl;
@@ -69,9 +65,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
/** */
protected final MvccSnapshot mvccSnapshot;
- /** */
- private MvccQueryTracker mvccTracker;
-
/**
* @param cctx Context.
* @param keys Keys.
@@ -120,7 +113,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
recovery
);
- assert mvccSnapshot == null || cctx.mvccEnabled();
+ assert (mvccSnapshot == null) == !cctx.mvccEnabled();
this.mvccSnapshot = mvccSnapshot;
@@ -133,17 +126,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* @return Mvcc snapshot if mvcc is enabled for cache.
*/
@Nullable private MvccSnapshot mvccSnapshot() {
- if (!cctx.mvccEnabled())
- return null;
-
- if (mvccSnapshot != null)
- return mvccSnapshot;
-
- MvccSnapshot snapshot = mvccTracker.snapshot();
-
- assert snapshot != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]";
-
- return snapshot;
+ return mvccSnapshot;
}
/**
@@ -160,20 +143,12 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
canRemap = false;
}
- else{
+ else {
// Use affinity topology version if constructor version is not specify.
topVer = topVer.topologyVersion() > 0 ? topVer : cctx.affinity().affinityTopologyVersion();
}
- if (!cctx.mvccEnabled() || mvccSnapshot != null)
- initialMap(topVer);
- else {
- mvccTracker = new MvccQueryTrackerImpl(cctx, canRemap);
-
- registrateFutureInMvccManager(this);
-
- mvccTracker.requestSnapshot(topVer, this);
- }
+ initialMap(topVer);
}
/**
@@ -186,30 +161,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
}
/** {@inheritDoc} */
- @Override public void onResponse(MvccSnapshot res) {
- AffinityTopologyVersion topVer = mvccTracker.topologyVersion();
-
- assert topVer != null;
-
- initialMap(topVer);
- }
-
- /** {@inheritDoc} */
- @Override public void onError(IgniteCheckedException e) {
- onDone(e);
- }
-
- /** {@inheritDoc} */
@Override public boolean onDone(Map<K, V> res, Throwable err) {
if (super.onDone(res, err)) {
if (trackable)
cctx.mvcc().removeFuture(futId);
- MvccQueryTracker mvccTracker = this.mvccTracker;
-
- if (mvccTracker != null)
- mvccTracker.onDone();
-
cache().sendTtlUpdateRequest(expiryPlc);
return true;
@@ -530,7 +486,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
taskName,
expiryPlc,
!deserializeBinary,
- mvccSnapshot(),
null);
if (getRes != null) {
@@ -549,8 +504,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
null,
taskName,
expiryPlc,
- !deserializeBinary,
- mvccSnapshot());
+ !deserializeBinary);
}
entry.touch();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index c5a2bff..c23eb1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -488,7 +488,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName,
expiryPlc,
true,
- mvccSnapshot,
null);
if (res != null) {
@@ -507,8 +506,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
null,
taskName,
expiryPlc,
- true,
- mvccSnapshot);
+ true);
}
entry.touch();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
index 72f0173..30efc10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxQueryEnlistResultHandler.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -124,23 +123,19 @@ public final class NearTxQueryEnlistResultHandler implements CI1<IgniteInternalF
GridNearTxQueryEnlistResponse res = createResponse(fut);
if (res.removeMapping()) {
- tx.skipCompletedVersions(true);
-
- tx.rollbackDhtLocalAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut0) {
- try {
- cctx.io().send(nearNodeId, res, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(fut.log, "Failed to send near enlist response [" +
- "tx=" + CU.txString(tx) +
- ", node=" + nearNodeId +
- ", res=" + res + ']', e);
-
- throw new GridClosureException(e);
- }
- }
- });
+ cctx.tm().forgetTx(tx);
+
+ try {
+ cctx.io().send(nearNodeId, res, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(fut.log, "Failed to send near enlist response [" +
+ "tx=" + CU.txString(tx) +
+ ", node=" + nearNodeId +
+ ", res=" + res + ']', e);
+
+ throw new GridClosureException(e);
+ }
return;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d44dcd0..5952617 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1535,7 +1535,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskName,
expiry,
true,
- null,
null);
if (getRes != null) {
@@ -1554,8 +1553,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
taskName,
expiry,
- !deserializeBinary,
- null);
+ !deserializeBinary);
}
// Entry was not in memory or in swap, so we remove it from cache.
@@ -2174,8 +2172,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessor,
taskName,
null,
- req.keepBinary(),
- null);
+ req.keepBinary());
Object oldVal = null;
Object updatedVal = null;
@@ -2355,8 +2352,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
taskName,
null,
- req.keepBinary(),
- null);
+ req.keepBinary());
Object val = ctx.config().getInterceptor().onBeforePut(
new CacheLazyEntry(
@@ -2401,8 +2397,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
taskName,
null,
- req.keepBinary(),
- null);
+ req.keepBinary());
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
.onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary()));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 76e11d6..281669e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -48,11 +48,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUn
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
@@ -229,8 +229,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}, opCtx, /*retry*/false);
}
- AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
-
subjId = ctx.subjectIdPerCall(subjId, opCtx);
MvccSnapshot mvccSnapshot = null;
@@ -253,6 +251,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
}
+ AffinityTopologyVersion topVer;
+
+ if (tx != null)
+ topVer = tx.topologyVersion();
+ else if (mvccTracker != null)
+ topVer = mvccTracker.topologyVersion();
+ else
+ topVer = ctx.affinity().affinityTopologyVersion();
+
GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
ctx.toCacheKeyObject(key),
topVer,
@@ -349,7 +356,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
}
- AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+ AffinityTopologyVersion topVer;
+
+ if (tx != null)
+ topVer = tx.topologyVersion();
+ else if (mvccTracker != null)
+ topVer = mvccTracker.topologyVersion();
+ else
+ topVer = ctx.affinity().affinityTopologyVersion();
IgniteInternalFuture<Map<K, V>> fut = loadAsync(
ctx.cacheKeysView(keys),
@@ -384,47 +398,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
/**
- * @param keys Keys to load.
- * @param readThrough Read through flag.
- * @param forcePrimary Force get from primary node flag.
- * @param topVer Topology version.
- * @param subjId Subject ID.
- * @param taskName Task name.
- * @param deserializeBinary Deserialize binary flag.
- * @param expiryPlc Expiry policy.
- * @param skipVals Skip values flag.
- * @param needVer Need version.
- * @return Loaded values.
- */
- private IgniteInternalFuture<Map<K, V>> loadAsync(
- @Nullable Collection<KeyCacheObject> keys,
- boolean readThrough,
- boolean forcePrimary,
- AffinityTopologyVersion topVer,
- @Nullable UUID subjId,
- String taskName,
- boolean deserializeBinary,
- boolean recovery,
- @Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals,
- boolean needVer,
- @Nullable String txLbl) {
- return loadAsync(keys,
- readThrough,
- forcePrimary,
- topVer, subjId,
- taskName,
- deserializeBinary,
- recovery,
- expiryPlc,
- skipVals,
- needVer,
- false,
- txLbl,
- null);
- }
-
- /**
* @param key Key to load.
* @param readThrough Read through flag.
* @param forcePrimary Force get from primary node flag.
@@ -508,7 +481,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable String txLbl,
@Nullable MvccSnapshot mvccSnapshot
) {
- assert mvccSnapshot == null || ctx.mvccEnabled();
+ assert (mvccSnapshot == null) == !ctx.mvccEnabled();
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -516,9 +489,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (expiryPlc == null)
expiryPlc = expiryPolicy(null);
- // Optimization: try to resolve value locally and escape 'get future' creation. Not applcable for MVCC,
- // because local node may contain a visible version which is no the most recent one.
- if (!ctx.mvccEnabled() && !forcePrimary && ctx.affinityNode()) {
+ // Optimization: try to resolve value locally and escape 'get future' creation.
+ if (!forcePrimary && ctx.affinityNode()) {
try {
Map<K, V> locVals = null;
@@ -594,7 +566,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
expiryPlc,
!deserializeBinary,
- mvccSnapshot,
null);
if (getRes != null) {
@@ -613,8 +584,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
null,
taskName,
expiryPlc,
- !deserializeBinary,
- mvccSnapshot);
+ !deserializeBinary);
}
// Entry was not in memory or in swap, so we remove it from cache.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8350fe6..5cfa56e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2156,7 +2156,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
tryToPerformLocalSnapshotOperation();
if (err == null)
- cctx.coordinators().onExchangeDone(exchCtx.events().discoveryCache());
+ cctx.coordinators().onExchangeDone(events().discoveryCache());
// Create and destory caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java
deleted file mode 100644
index 1648da9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/AckCoordinatorOnRollback.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.typedef.CIX1;
-
-/** */
-public class AckCoordinatorOnRollback extends CIX1<IgniteInternalFuture<IgniteInternalTx>> {
- /** */
- private static final long serialVersionUID = 8172699207968328284L;
-
- /** */
- private final GridNearTxLocal tx;
-
- /**
- * @param tx Transaction.
- */
- public AckCoordinatorOnRollback(GridNearTxLocal tx) {
- this.tx = tx;
- }
-
- /** {@inheritDoc} */
- @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
- assert fut.isDone();
-
- MvccQueryTracker tracker = tx.mvccQueryTracker();
- MvccSnapshot mvccSnapshot = tx.mvccSnapshot();
-
- if (tracker != null) // Optimistic tx.
- tracker.onDone(tx, false);
- else if (mvccSnapshot != null)// Pessimistic tx.
- tx.context().coordinators().ackTxRollback(mvccSnapshot);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ca71e60..5615d44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -134,9 +134,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
else {
AffinityTopologyVersion mapTopVer = topVer;
- if (mapTopVer == null) {
+ if (mapTopVer == null)
mapTopVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
- }
map(keys, Collections.emptyMap(), mapTopVer);
}
@@ -361,8 +360,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
taskName,
expiryPlc,
!deserializeBinary,
- null,
- null); // TODO IGNITE-7371
+ null);
if (res != null) {
v = res.value();
@@ -380,8 +378,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
null,
taskName,
expiryPlc,
- !deserializeBinary,
- null); // TODO IGNITE-7371
+ !deserializeBinary);
}
}
@@ -502,8 +499,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
taskName,
expiryPlc,
!deserializeBinary,
- null,
- null); // TODO IGNITE-7371
+ null);
if (res != null) {
v = res.value();
@@ -521,8 +517,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
null,
taskName,
expiryPlc,
- !deserializeBinary,
- null); // TODO IGNITE-7371
+ !deserializeBinary);
}
// Entry was not in memory or in swap, so we remove it from cache.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index d2e1586..4ea65a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -274,7 +274,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
@SuppressWarnings("unchecked")
private void preparePessimistic() {
- assert !tx.implicitSingle() || tx.queryEnlisted(); // Non-mvcc implicit-single tx goes fast commit way.
+ assert !tx.implicitSingle() || tx.txState().mvccEnabled(); // Non-mvcc implicit-single tx goes fast commit way.
Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index 27b2fd7..b782483 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -339,8 +339,7 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
AffinityTopologyVersion topVer = fut.topologyVersion();
- if (tx != null)
- tx.topologyVersion(topVer);
+ tx.topologyVersion(topVer);
if (this.topVer == null)
this.topVer = topVer;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 8cbe93e..d98065d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -221,8 +221,6 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
throw new ClusterTopologyServerNotFoundException("Failed to get primary node " +
"[topVer=" + topVer + ", key=" + key + ']');
- tx.markQueryEnlisted(null);
-
if (!sequential)
batch = batches.get(node.id());
else if (batch != null && !batch.node().equals(node))
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index a3a5cdb..2488eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,12 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.transactions.TransactionState;
/**
*
@@ -37,6 +35,8 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
* @param finishFut Finish future.
*/
GridNearTxFinishAndAckFuture(NearTxFinishFuture finishFut) {
+ finishFut.listen(this::onFinishFutureDone);
+
this.finishFut = finishFut;
}
@@ -51,92 +51,41 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
}
/** {@inheritDoc} */
- @Override @SuppressWarnings("unchecked")
- public void finish(boolean commit, boolean clearThreadMap, boolean onTimeout) {
- finishFut.finish(commit, clearThreadMap, onTimeout);
-
- if (finishFut.commit()) {
- finishFut.listen((IgniteInClosure)new IgniteInClosure<NearTxFinishFuture>() {
- @Override public void apply(final NearTxFinishFuture fut) {
- GridNearTxLocal tx = fut.tx();
-
- IgniteInternalFuture<Void> ackFut = null;
-
- MvccQueryTracker tracker = tx.mvccQueryTracker();
-
- MvccSnapshot mvccSnapshot = tx.mvccSnapshot();
-
- if (tracker != null)
- ackFut = tracker.onDone(tx, commit);
- else if (mvccSnapshot != null) {
- if (commit)
- ackFut = tx.context().coordinators().ackTxCommit(mvccSnapshot);
- else
- tx.context().coordinators().ackTxRollback(mvccSnapshot);
- }
-
- if (ackFut != null) {
- ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
- @Override public void apply(IgniteInternalFuture<Void> ackFut) {
- Exception err = null;
-
- try {
- fut.get();
-
- ackFut.get();
- }
- catch (Exception e) {
- err = e;
- }
- catch (Error e) {
- onDone(e);
-
- throw e;
- }
-
- if (err != null)
- onDone(err);
- else
- onDone(fut.tx());
- }
- });
- }
- else
- finishWithFutureResult(fut);
- }
- });
- }
- else {
- finishFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture fut) {
- finishWithFutureResult(fut);
- }
- });
- }
+ @Override public void onNodeStop(IgniteCheckedException e) {
+ finishFut.onNodeStop(e);
}
/** {@inheritDoc} */
- @Override public void onNodeStop(IgniteCheckedException e) {
- super.onDone(finishFut.tx(), e);
+ @Override public void finish(boolean commit, boolean clearThreadMap, boolean onTimeout) {
+ finishFut.finish(commit, clearThreadMap, onTimeout);
}
- /**
- * @param fut Future.
- */
- private void finishWithFutureResult(IgniteInternalFuture<IgniteInternalTx> fut) {
- try {
- onDone(fut.get());
- }
- catch (IgniteCheckedException | RuntimeException e) {
- onDone(e);
- }
- catch (Error e) {
- onDone(e);
+ /** */
+ private void onFinishFutureDone(IgniteInternalFuture<IgniteInternalTx> fut) {
+ GridNearTxLocal tx = tx(); Throwable err = fut.error();
+
+ if (tx.state() == TransactionState.COMMITTED)
+ tx.context().coordinators().ackTxCommit(tx.mvccSnapshot())
+ .listen(fut0 -> onDone(tx, addSuppressed(err, fut0.error())));
+ else {
+ tx.context().coordinators().ackTxRollback(tx.mvccSnapshot());
- throw e;
+ onDone(tx, err);
}
}
+ /** */
+ private Throwable addSuppressed(Throwable to, Throwable ex) {
+ if (ex == null)
+ return to;
+ else if (to == null)
+ return ex;
+ else
+ to.addSuppressed(ex);
+
+ return to;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxFinishAndAckFuture.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index ca9e1bf..a3171d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -40,13 +40,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -441,23 +438,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
private void doFinish(boolean commit, boolean clearThreadMap) {
try {
if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
- GridLongList waitTxs = tx.mvccWaitTransactions();
-
- if (waitTxs != null) {
- MvccSnapshot snapshot = tx.mvccSnapshot();
-
- MvccCoordinator crd = cctx.coordinators().currentCoordinator();
-
- assert snapshot != null;
-
- if (snapshot.coordinatorVersion() == crd.coordinatorVersion()) {
- IgniteInternalFuture fut = cctx.coordinators()
- .waitTxsFuture(cctx.coordinators().currentCoordinatorId(), waitTxs);
-
- add(fut);
- }
- }
-
// Cleanup transaction if heuristic failure.
if (tx.state() == UNKNOWN)
cctx.tm().rollbackTx(tx, clearThreadMap, false);
@@ -467,13 +447,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- assert !hasFutures() || isDone() || waitTxs != null : futures();
+ assert !hasFutures() || isDone() : futures();
finish(1, mapping, commit, !clearThreadMap);
}
}
else {
- assert !hasFutures() || isDone() || waitTxs != null : futures();
+ assert !hasFutures() || isDone() : futures();
finish(mappings.mappings(), commit, !clearThreadMap);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b918753..08dfe2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -62,8 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -91,7 +89,6 @@ import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CX1;
-import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -190,9 +187,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/** Tx label. */
@Nullable private String lb;
- /** */
- private MvccQueryTracker mvccTracker;
-
/** Whether this is Mvcc transaction or not.<p>
* {@code null} means there haven't been any calls made on this transaction, and first operation will give this
* field actual value.
@@ -266,13 +260,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
trackTimeout = timeout() > 0 && !implicit() && cctx.time().addTimeoutObject(this);
}
- /**
- * @return Mvcc query version tracker.
- */
- public MvccQueryTracker mvccQueryTracker() {
- return mvccTracker;
- }
-
/** {@inheritDoc} */
@Override public boolean near() {
return true;
@@ -1394,7 +1381,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
keepBinary,
- null, // TODO IGNITE-7371
null) : null;
if (res != null) {
@@ -1413,8 +1399,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
entryProcessor,
resolveTaskName(),
null,
- keepBinary,
- null); // TODO IGNITE-7371
+ keepBinary);
}
}
catch (ClusterTopologyCheckedException e) {
@@ -1967,17 +1952,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
- * @param cctx Cache context.
- * @return Mvcc snapshot for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs).
- */
- private MvccSnapshot mvccReadSnapshot(GridCacheContext cctx) {
- if (!cctx.mvccEnabled() || mvccTracker == null)
- return null;
-
- return mvccTracker.snapshot();
- }
-
- /**
* @param cacheCtx Cache context.
* @param cacheIds Involved cache ids.
* @param parts Partitions.
@@ -2290,7 +2264,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
txEntry.keepBinary(),
- null, // TODO IGNITE-7371
null);
if (getRes != null) {
@@ -2309,8 +2282,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
transformClo,
resolveTaskName(),
null,
- txEntry.keepBinary(),
- null); // TODO IGNITE-7371
+ txEntry.keepBinary());
}
// If value is in cache and passed the filter.
@@ -2590,8 +2562,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
txEntry.keepBinary(),
- null,
- null); // TODO IGNITE-7371
+ null);
if (getRes != null) {
val = getRes.value();
@@ -2609,8 +2580,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
transformClo,
resolveTaskName(),
null,
- txEntry.keepBinary(),
- null); // TODO IGNITE-7371
+ txEntry.keepBinary());
}
if (val != null) {
@@ -2678,7 +2648,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- mvccReadSnapshot(cacheCtx), // TODO IGNITE-7371
null) : null;
if (getRes != null) {
@@ -2697,8 +2666,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
null,
resolveTaskName(),
accessPlc,
- !deserializeBinary,
- mvccReadSnapshot(cacheCtx)); // TODO IGNITE-7371
+ !deserializeBinary);
}
if (val != null) {
@@ -3037,7 +3005,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
needVer,
/*keepCacheObject*/true,
recovery,
- mvccReadSnapshot(cacheCtx),
+ null,
label()
).chain(new C1<IgniteInternalFuture<Object>, Void>() {
@Override public Void apply(IgniteInternalFuture<Object> f) {
@@ -3071,7 +3039,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
needVer,
/*keepCacheObject*/true,
label(),
- mvccReadSnapshot(cacheCtx)
+ null
).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
@Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
@@ -3168,8 +3136,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
expiryPlc0,
txEntry == null ? keepBinary : txEntry.keepBinary(),
- null,
- null); // TODO IGNITE-7371
+ null);
if (res == null) {
if (misses == null)
@@ -3351,6 +3318,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
}
+
+ /** {@inheritDoc} */
+ @Override public boolean queryEnlisted() {
+ if (!txState.mvccEnabled())
+ return false;
+ else if (qryEnlisted)
+ return true;
+ else if (mappings.single())
+ return !mappings.empty() && mappings.singleMapping().queryUpdate();
+ else
+ return mappings.mappings().stream().anyMatch(GridDistributedTxMapping::queryUpdate);
+ }
+
/**
* Adds key mapping to dht mapping.
*
@@ -3881,14 +3861,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
NearTxFinishFuture fut = fast ? new GridNearTxFastFinishFuture(this, commit) :
new GridNearTxFinishFuture<>(cctx, this, commit);
- if (mvccQueryTracker() != null || mvccSnapshot != null || txState.mvccEnabled()) {
- if (commit)
- fut = new GridNearTxFinishAndAckFuture(fut);
- else
- fut.listen(new AckCoordinatorOnRollback(this));
- }
-
- return fut;
+ return mvccSnapshot != null ? new GridNearTxFinishAndAckFuture(fut) : fut;
}
/** {@inheritDoc} */
@@ -3977,9 +3950,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @return {@code True} if 'fast finish' path can be used for transaction completion.
*/
private boolean fastFinish() {
- return writeMap().isEmpty() && !queryEnlisted()
- && ((optimistic() && !serializable()) || readMap().isEmpty())
- && (mappings.single() || F.view(mappings.mappings(), CU.FILTER_QUERY_MAPPING).isEmpty());
+ return !queryEnlisted() && writeMap().isEmpty()
+ && ((optimistic() && !serializable()) || readMap().isEmpty());
}
/**
@@ -4050,26 +4022,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
- if (prep != null) {
- return new GridEmbeddedFuture<>(new CX2<IgniteInternalTx, Exception, IgniteInternalTx>() {
- @Override public IgniteInternalTx applyx(IgniteInternalTx o, Exception e) throws IgniteCheckedException {
- cctx.tm().mvccFinish(GridNearTxLocal.this, e == null);
+ IgniteInternalFuture fut = prep != null ? prep : new GridFinishedFuture<>(this);
- return o;
- }
- }, (IgniteInternalFuture<IgniteInternalTx>)prep);
- }
-
- try {
- cctx.tm().mvccFinish(this, true);
-
- return new GridFinishedFuture<>(this);
- }
- catch (IgniteCheckedException e) {
- commitError(e);
+ if (fut.isDone())
+ cctx.tm().mvccFinish(this);
+ else
+ fut.listen(f -> cctx.tm().mvccFinish(this));
- return new GridFinishedFuture<>(e);
- }
+ return fut;
}
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
index 7f8a121..345e742 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Future to obtain/lock topology version for SELECT FOR UPDATE.
@@ -167,4 +168,9 @@ public class TxTopologyVersionFuture extends GridFutureAdapter<AffinityTopologyV
public boolean clientFirst() {
return cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxTopologyVersionFuture.class, this, super.toString());
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 07a6dfc..f46e90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -457,7 +457,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
taskName,
expiry,
!deserializeBinary,
- null,
null);
if (res != null) {
@@ -485,8 +484,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
taskName,
expiry,
- !deserializeBinary,
- null);
+ !deserializeBinary);
if (v != null) {
ctx.addResult(vals,
@@ -1093,8 +1091,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
entryProcessor,
taskName,
null,
- keepBinary,
- null);
+ keepBinary);
Object oldVal = null;
@@ -1236,8 +1233,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
taskName,
null,
- keepBinary,
- null);
+ keepBinary);
Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(
ctx, entry.key(), old, keepBinary), val);
@@ -1272,8 +1268,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
null,
taskName,
null,
- keepBinary,
- null);
+ keepBinary);
IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
.onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
index 045177a..c724da0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -28,41 +28,53 @@ import org.apache.ignite.internal.util.typedef.internal.S;
*/
public class MvccCoordinator implements Serializable {
/** */
+ public static final MvccCoordinator DISCONNECTED_COORDINATOR =
+ new MvccCoordinator(AffinityTopologyVersion.NONE, null, 0, false);
+
+ /** */
+ public static final MvccCoordinator UNASSIGNED_COORDINATOR =
+ new MvccCoordinator(AffinityTopologyVersion.NONE, null, 0, false);
+
+ /** */
private static final long serialVersionUID = 0L;
/** */
+ @GridToStringInclude
+ private final AffinityTopologyVersion topVer;
+
+ /** */
private final UUID nodeId;
/**
* Unique coordinator version, increases when new coordinator is assigned,
* can differ from topVer if we decide to assign coordinator manually.
*/
- private final long crdVer;
+ private final long ver;
/** */
- @GridToStringInclude
- private final AffinityTopologyVersion topVer;
+ private final boolean local;
+
+ /** */
+ private volatile boolean initialized;
/**
- * @param nodeId Coordinator node ID.
- * @param crdVer Coordinator version.
* @param topVer Topology version when coordinator was assigned.
+ * @param nodeId Coordinator node ID.
+ * @param ver Coordinator version.
+ * @param local {@code True} if the local node is a coordinator.
*/
- public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) {
- assert nodeId != null;
- assert crdVer > 0 : crdVer;
- assert topVer != null;
-
- this.nodeId = nodeId;
- this.crdVer = crdVer;
+ public MvccCoordinator(AffinityTopologyVersion topVer, UUID nodeId, long ver, boolean local) {
this.topVer = topVer;
+ this.nodeId = nodeId;
+ this.ver = ver;
+ this.local = local;
}
/**
- * @return Unique coordinator version.
+ * @return Topology version when coordinator was assigned.
*/
- public long coordinatorVersion() {
- return crdVer;
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
}
/**
@@ -73,10 +85,50 @@ public class MvccCoordinator implements Serializable {
}
/**
- * @return Topology version when coordinator was assigned.
+ * @return Unique coordinator version.
*/
- public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ public long version() {
+ return ver;
+ }
+
+ /**
+ *
+ * @return {@code True} if the coordinator is local.
+ */
+ public boolean local() {
+ return local;
+ }
+
+ /**
+ *
+ * @return {@code True} if the coordinator is disconnected.
+ */
+ public boolean disconnected() {
+ return this == DISCONNECTED_COORDINATOR;
+ }
+
+ /**
+ *
+ * @return {@code True} if the coordinator has not been assigned yet.
+ */
+ public boolean unassigned() {
+ return this == UNASSIGNED_COORDINATOR;
+ }
+
+ /**
+ *
+ * @return {@code True} if the coordinator is initialized.
+ */
+ public boolean initialized() {
+ return initialized;
+ }
+
+ /**
+ *
+ * @param initialized Initialized flag.
+ */
+ public void initialized(boolean initialized) {
+ this.initialized = initialized;
}
/** {@inheritDoc} */
@@ -89,12 +141,12 @@ public class MvccCoordinator implements Serializable {
MvccCoordinator that = (MvccCoordinator)o;
- return crdVer == that.crdVer;
+ return ver == that.ver;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return (int)(crdVer ^ (crdVer >>> 32));
+ return (int)(ver ^ (ver >>> 32));
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
deleted file mode 100644
index d2e936f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Serializable;
-
-/**
- * MVCC discovery data to be shared between nodes on join.
- */
-public class MvccDiscoveryData implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Current coordinator. */
- private MvccCoordinator crd;
-
- /**
- * @param crd Coordinator.
- */
- public MvccDiscoveryData(MvccCoordinator crd) {
- this.crd = crd;
- }
-
- /**
- * @return Current coordinator.
- */
- public MvccCoordinator coordinator() {
- return crd;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MvccDiscoveryData.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
index 6218bc0..26e4574 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -52,6 +53,11 @@ class MvccPreviousCoordinatorQueries {
/** */
private boolean initDone;
+ /** */
+ void init() {
+ init(null, Collections.emptyList(), null);
+ }
+
/**
* @param nodeQueries Active queries map.
* @param nodes Cluster nodes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index fd45c7a..9f8f702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -17,19 +17,16 @@
package org.apache.ignite.internal.processors.cache.mvcc;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.GridLongList;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -40,8 +37,9 @@ public interface MvccProcessor extends GridProcessor {
* Local join callback.
*
* @param evt Discovery event.
+ * @param discoCache Disco cache.
*/
- void onLocalJoin(DiscoveryEvent evt);
+ void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache);
/**
* Exchange done callback.
@@ -51,50 +49,35 @@ public interface MvccProcessor extends GridProcessor {
void onExchangeDone(DiscoCache discoCache);
/**
- * @param nodeId Node ID
- * @param activeQueries Active queries.
- */
- void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries);
-
- /**
* @return Coordinator.
*/
- @Nullable MvccCoordinator currentCoordinator();
-
- /**
- * @return Current coordinator node ID.
- */
- UUID currentCoordinatorId();
+ @NotNull MvccCoordinator currentCoordinator();
/**
* @param crdVer Mvcc coordinator version.
* @param cntr Mvcc counter.
* @return State for given mvcc version.
- * @throws IgniteCheckedException If fails.
*/
- byte state(long crdVer, long cntr) throws IgniteCheckedException;
+ byte state(long crdVer, long cntr);
/**
* @param ver Version to check.
* @return State for given mvcc version.
- * @throws IgniteCheckedException If fails.
*/
- byte state(MvccVersion ver) throws IgniteCheckedException;
+ byte state(MvccVersion ver);
/**
* @param ver Version.
* @param state State.
- * @throws IgniteCheckedException If fails;
*/
- void updateState(MvccVersion ver, byte state) throws IgniteCheckedException;
+ void updateState(MvccVersion ver, byte state);
/**
* @param ver Version.
* @param state State.
* @param primary Flag if this is primary node.
- * @throws IgniteCheckedException If fails;
*/
- void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException;
+ void updateState(MvccVersion ver, byte state, boolean primary);
/**
* @param crd Mvcc coordinator version.
@@ -113,9 +96,13 @@ public interface MvccProcessor extends GridProcessor {
* @param cctx Cache context.
* @param locked Version the entry is locked by.
* @return Future, which is completed as soon as the lock is released.
- * @throws IgniteCheckedException If failed.
*/
- IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException;
+ IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked);
+
+ /**
+ * @param locked Version the entry is locked by.
+ */
+ void releaseWaiters(MvccVersion locked);
/**
* @param tracker Query tracker.
@@ -130,54 +117,50 @@ public interface MvccProcessor extends GridProcessor {
/**
* @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
* {@code Null} in other cases.
- * @throws ClusterTopologyCheckedException If coordinator doesn't match locked topology or not assigned.
*/
- MvccSnapshot tryRequestSnapshotLocal() throws ClusterTopologyCheckedException;
+ MvccSnapshot requestWriteSnapshotLocal();
/**
- * @param tx Transaction.
* @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
* {@code Null} in other cases.
- * @throws ClusterTopologyCheckedException If coordinator doesn't match locked topology or not assigned.
*/
- MvccSnapshot tryRequestSnapshotLocal(@Nullable IgniteInternalTx tx) throws ClusterTopologyCheckedException;
+ MvccSnapshot requestReadSnapshotLocal();
/**
* Requests snapshot on Mvcc coordinator.
*
- * @param tx Transaction.
- * @return Snapshot future.
+ * @return Result future.
*/
- IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx);
+ IgniteInternalFuture<MvccSnapshot> requestReadSnapshotAsync();
/**
* Requests snapshot on Mvcc coordinator.
*
- * @param lsnr Request listener.
+ * @return Result future.
*/
- void requestSnapshotAsync(MvccSnapshotResponseListener lsnr);
+ IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync();
/**
* Requests snapshot on Mvcc coordinator.
*
- * @param tx Transaction
+ * @param crd Expected coordinator.
* @param lsnr Request listener.
*/
- void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr);
+ void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
/**
- * @param updateVer Transaction update version.
- * @return Acknowledge future.
+ * Requests snapshot on Mvcc coordinator.
+ *
+ * @param crd Expected coordinator.
+ * @param lsnr Request listener.
*/
- IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer);
+ void requestWriteSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
/**
* @param updateVer Transaction update version.
- * @param readSnapshot Transaction read version.
- * @param qryId Query tracker id.
* @return Acknowledge future.
*/
- IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryId);
+ IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer);
/**
* @param updateVer Transaction update version.
@@ -185,26 +168,12 @@ public interface MvccProcessor extends GridProcessor {
void ackTxRollback(MvccVersion updateVer);
/**
- * @param updateVer Transaction update version.
- * @param readSnapshot Transaction read version.
- * @param qryTrackerId Query tracker id.
- */
- void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId);
-
- /**
* @param snapshot Query version.
* @param qryId Query tracker ID.
*/
void ackQueryDone(MvccSnapshot snapshot, long qryId);
/**
- * @param crdId Coordinator ID.
- * @param txs Transaction IDs.
- * @return Future.
- */
- IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs);
-
- /**
* @param log Logger.
* @param diagCtx Diagnostic request.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 6621ddb..f3c563c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -40,7 +41,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -51,7 +53,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -61,12 +62,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryCntr;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
@@ -74,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
@@ -82,12 +79,10 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -100,6 +95,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -134,7 +130,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.CacheDataR
/**
* MVCC processor.
*/
-@SuppressWarnings("serial")
+@SuppressWarnings("unchecked")
public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, DatabaseLifecycleListener {
/** */
private static final boolean FORCE_MVCC =
@@ -146,9 +142,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** */
private static final Waiter LOCAL_TRANSACTION_MARKER = new LocalTransactionMarker();
- /** Dummy tx for vacuum. */
- private static final IgniteInternalTx DUMMY_TX = new GridNearTxLocal();
-
/** For tests only. */
private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC;
@@ -161,11 +154,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
MvccProcessorImpl.crdC = crdC;
}
- /** Topology version when local node was assigned as coordinator. */
- private volatile long crdVer;
-
/** */
- private volatile MvccCoordinator curCrd;
+ private volatile MvccCoordinator curCrd = MvccCoordinator.UNASSIGNED_COORDINATOR;
/** */
private TxLog txLog;
@@ -206,9 +196,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
private final Map<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>();
/** */
- private final Map<Long, GridFutureAdapter> waitTxFuts = new ConcurrentHashMap<>();
-
- /** */
private final Map<TxKey, Waiter> waitMap = new ConcurrentHashMap<>();
/** */
@@ -226,8 +213,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** Flag whether all nodes in cluster support MVCC. */
private volatile boolean mvccSupported = true;
- /** Flag whether coordinator was changed by the last discovery event. */
- private volatile boolean crdChanged;
+ /** */
+ private volatile AffinityTopologyVersion readyVer = AffinityTopologyVersion.NONE;
/**
* Maps failed node id to votes accumulator for that node.
@@ -245,12 +232,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- ctx.event().addLocalEventListener(new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- onDiscovery((DiscoveryEvent)evt);
- }
- },
- EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
+ ctx.event().addDiscoveryEventListener(this::onDiscovery, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED);
ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
@@ -271,6 +253,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
@Override public void preProcessCacheConfiguration(CacheConfiguration ccfg) {
if (FORCE_MVCC && ccfg.getAtomicityMode() == TRANSACTIONAL && !CU.isSystemCache(ccfg.getName())) {
ccfg.setAtomicityMode(TRANSACTIONAL_SNAPSHOT);
+ //noinspection unchecked
ccfg.setNearConfiguration(null);
}
@@ -371,86 +354,84 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
private void txLogPageStoreInit(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
assert CU.isPersistenceEnabled(ctx.config());
+ //noinspection ConstantConditions
ctx.cache().context().pageStore().initialize(TX_LOG_CACHE_ID, 1,
TX_LOG_CACHE_NAME, mgr.dataRegion(TX_LOG_CACHE_NAME).memoryMetrics());
}
/** {@inheritDoc} */
@Override public void onExchangeDone(DiscoCache discoCache) {
+ assert discoCache != null && readyVer.compareTo(discoCache.version()) < 0;
+
MvccCoordinator curCrd0 = curCrd;
- if (crdChanged) {
- // Rollback all transactions with old snapshots.
- ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
+ if (curCrd0.disconnected())
+ return; // Nothing to do.
- // Complete init future if local node is a new coordinator. All previous txs are already completed here.
- if (crdVer != 0 && !initFut.isDone()) {
- assert curCrd0 != null && curCrd0.nodeId().equals(ctx.localNodeId());
+ assert curCrd0.topologyVersion().initialized();
- initFut.onDone();
- }
+ if (curCrd0.initialized() && curCrd0.local())
+ cleanupOrphanedServerTransactions(discoCache.serverNodes());
- crdChanged = false;
- }
- else {
- if (curCrd0 != null && ctx.localNodeId().equals(curCrd0.nodeId()) && discoCache != null)
- cleanupOrphanedServerTransactions(discoCache.serverNodes());
- }
+ if (!curCrd0.initialized() && coordinatorChanged(curCrd0, readyVer, discoCache.version()))
+ initialize(curCrd0);
}
/** {@inheritDoc} */
- @Override public void onLocalJoin(DiscoveryEvent evt) {
- assert evt.type() == EVT_NODE_JOINED && ctx.localNodeId().equals(evt.eventNode().id());
+ @Override public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
+ assert evt.type() == EVT_NODE_JOINED && evt.eventNode().isLocal();
+
+ checkMvccSupported(discoCache.allNodes());
- onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), false);
+ onCoordinatorChanged(discoCache.version(), discoCache.allNodes(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+ MvccCoordinator curCrd0 = curCrd;
+
+ if (!curCrd0.disconnected()) {
+ // Notify all listeners waiting for a snapshot.
+ onCoordinatorFailed(curCrd0.nodeId());
+
+ curCrd = MvccCoordinator.DISCONNECTED_COORDINATOR;
+
+ readyVer = AffinityTopologyVersion.NONE;
+ }
}
/**
- * Discovery listener. Note: initial join event is handled by {@link MvccProcessorImpl#onLocalJoin(DiscoveryEvent)}
+ * Discovery listener. Note: initial join event is handled by {@link MvccProcessorImpl#onLocalJoin}
* method.
*
* @param evt Discovery event.
*/
- private void onDiscovery(DiscoveryEvent evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_JOINED;
+ private void onDiscovery(DiscoveryEvent evt, DiscoCache discoCache) {
+ assert evt.type() == EVT_NODE_FAILED
+ || evt.type() == EVT_NODE_LEFT
+ || evt.type() == EVT_NODE_JOINED;
UUID nodeId = evt.eventNode().id();
+ AffinityTopologyVersion topVer = discoCache.version();
+ List<ClusterNode> nodes = discoCache.allNodes();
+
+ checkMvccSupported(nodes);
MvccCoordinator curCrd0 = curCrd;
if (evt.type() == EVT_NODE_JOINED) {
- if (curCrd0 == null) // Handle join event only if coordinator has not been elected yet.
- onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), false);
-
- return;
+ if (curCrd0.disconnected()) // Handle join event only if coordinator has not been elected yet.
+ onCoordinatorChanged(topVer, nodes, false);
}
-
- // Process mvcc coordinator left event on the rest nodes.
- if (nodeId.equals(curCrd0.nodeId())) {
+ else if (Objects.equals(nodeId, curCrd0.nodeId())) {
// 1. Notify all listeners waiting for a snapshot.
- Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId);
-
- if (map != null) {
- ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc " +
- "version, coordinator failed: " + nodeId);
+ onCoordinatorFailed(nodeId);
- MvccSnapshotResponseListener lsnr;
-
- for (Long id : map.keySet()) {
- if ((lsnr = map.remove(id)) != null)
- lsnr.onError(ex);
- }
- }
-
- // 2. Notify acknowledge futures.
- for (WaitAckFuture fut : ackFuts.values())
- fut.onNodeLeft(nodeId);
-
- // 3. Process coordinator change.
- onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), true);
+ // 2. Process coordinator change.
+ onCoordinatorChanged(topVer, nodes, true);
}
// Process node left event on the current mvcc coordinator.
- else if (curCrd0.nodeId().equals(ctx.localNodeId())) {
+ else if (curCrd0.local()) {
// 1. Notify active queries.
activeQueries.onNodeFailed(nodeId);
@@ -477,65 +458,83 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
+ /** */
+ private void onCoordinatorFailed(UUID nodeId) {
+ // 1. Notify all listeners waiting for a snapshot.
+ Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId);
+
+ if (map != null) {
+ ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc " +
+ "version, coordinator left: " + nodeId);
+
+ MvccSnapshotResponseListener lsnr;
+
+ for (Long id : map.keySet()) {
+ if ((lsnr = map.remove(id)) != null)
+ lsnr.onError(ex);
+ }
+ }
+
+ // 2. Notify acknowledge futures.
+ for (WaitAckFuture fut : ackFuts.values())
+ fut.onNodeLeft(nodeId);
+ }
+
/**
* Coordinator change callback. Performs all needed actions for handling new coordinator assignment.
*
- * @param nodes Cluster topology snapshot.
- * @param topVer Topology version.
* @param sndQrys {@code True} if it is need to send an active queries list to the new coordinator.
*/
- private void onCoordinatorChanged(Collection<ClusterNode> nodes, long topVer, boolean sndQrys) {
+ private void onCoordinatorChanged(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean sndQrys) {
MvccCoordinator newCrd = pickMvccCoordinator(nodes, topVer);
- if (newCrd == null)
+ if (newCrd.disconnected()) {
+ curCrd = newCrd;
+
return;
+ }
- // Update current coordinator, collect active queries and send it to the new coordinator if needed.
- GridLongList activeQryTrackers = null;
+ assert newCrd.topologyVersion().compareTo(curCrd.topologyVersion()) > 0;
- synchronized (activeTrackers) {
- assert curCrd == null || newCrd.topologyVersion().compareTo(curCrd.topologyVersion()) > 0;
+ curCrd = newCrd;
- if (sndQrys) {
- activeQryTrackers = new GridLongList();
+ if (newCrd.local() && !sndQrys)
+ // Coordinator was assigned on local join. There was no coordinator before.
+ prevCrdQueries.init();
- for (MvccQueryTracker tracker : activeTrackers.values()) {
- long trackerId = tracker.onMvccCoordinatorChange(newCrd);
+ if (sndQrys) {
+ GridLongList activeQryTrackers = new GridLongList();
- if (trackerId != MVCC_TRACKER_ID_NA)
- activeQryTrackers.add(trackerId);
- }
- }
+ for (MvccQueryTracker tracker : activeTrackers.values()) {
+ long trackerId = tracker.onMvccCoordinatorChange(newCrd);
- curCrd = newCrd;
- }
+ if (trackerId != MVCC_TRACKER_ID_NA)
+ activeQryTrackers.add(trackerId);
+ }
- // Send local active queries to remote coordinator, if needed.
- if (!newCrd.nodeId().equals(ctx.localNodeId())) {
- try {
- if (sndQrys)
+ if (newCrd.local())
+ prevCrdQueries.init(activeQryTrackers, nodes, ctx.discovery());
+ else {
+ try {
sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(activeQryTrackers));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
+ }
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
- }
- }
- // If a current node was elected as a new mvcc coordinator, we need to pre-initialize it.
- else {
- assert crdVer == 0 : crdVer;
-
- crdVer = newCrd.coordinatorVersion();
-
- if (log.isInfoEnabled())
- log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
- ", crdVer=" + crdVer + ']');
-
- prevCrdQueries.init(activeQryTrackers, F.view(nodes, this::supportsMvcc), ctx.discovery());
-
- // Do not complete init future here, because we should wait until all old transactions become terminated.
}
+ }
- crdChanged = true;
+ /**
+ * @param currCrd Current Mvcc coordinator.
+ * @param from Start topology version.
+ * @param to End topology version
+ * @return {@code True} if coordinator was changed between two passed topology versions.
+ */
+ private boolean coordinatorChanged(MvccCoordinator currCrd, AffinityTopologyVersion from,
+ AffinityTopologyVersion to) {
+ return from.compareTo(currCrd.topologyVersion()) < 0
+ && to.compareTo(currCrd.topologyVersion()) >= 0;
}
/**
@@ -566,53 +565,58 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
onTxDone(txCntr, true);
}
- /** {@inheritDoc} */
- @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) {
- prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
- }
+ /**
+ * Initializes a new coordinator.
+ */
+ private void initialize(MvccCoordinator curCrd0) {
+ readyVer = curCrd0.topologyVersion();
- /** {@inheritDoc} */
- @Override @Nullable public MvccCoordinator currentCoordinator() {
- return curCrd;
+ curCrd0.initialized(true);
+
+ // Complete init future if local node is a new coordinator. All previous txs have been already completed here.
+ if (curCrd0.local())
+ ctx.closure().runLocalSafe(initFut::onDone);
}
/** {@inheritDoc} */
- @Override public UUID currentCoordinatorId() {
- MvccCoordinator curCrd = this.curCrd;
-
- return curCrd != null ? curCrd.nodeId() : null;
+ @Override @NotNull public MvccCoordinator currentCoordinator() {
+ return curCrd;
}
/** {@inheritDoc} */
- @Override public byte state(MvccVersion ver) throws IgniteCheckedException {
+ @Override public byte state(MvccVersion ver) {
return state(ver.coordinatorVersion(), ver.counter());
}
/** {@inheritDoc} */
- @Override public byte state(long crdVer, long cntr) throws IgniteCheckedException {
+ @Override public byte state(long crdVer, long cntr) {
assert txLog != null && mvccEnabled;
- return txLog.get(crdVer, cntr);
+ try {
+ return txLog.get(crdVer, cntr);
+ }
+ catch (IgniteCheckedException e) {
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+
+ return TxState.NA;
}
/** {@inheritDoc} */
- @Override public void updateState(MvccVersion ver, byte state) throws IgniteCheckedException {
+ @Override public void updateState(MvccVersion ver, byte state) {
updateState(ver, state, true);
}
/** {@inheritDoc} */
- @Override public void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException {
+ @Override public void updateState(MvccVersion ver, byte state, boolean primary) {
assert txLog != null && mvccEnabled;
- TxKey key = new TxKey(ver.coordinatorVersion(), ver.counter());
-
- txLog.put(key, state, primary);
-
- Waiter waiter;
-
- if (primary && (state == TxState.ABORTED || state == TxState.COMMITTED)
- && (waiter = waitMap.remove(key)) != null)
- waiter.run(ctx);
+ try {
+ txLog.put(new TxKey(ver.coordinatorVersion(), ver.counter()), state, primary);
+ }
+ catch (IgniteCheckedException e) {
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
}
/** {@inheritDoc} */
@@ -630,29 +634,32 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked) {
TxKey key = new TxKey(locked.coordinatorVersion(), locked.counter());
LockFuture fut = new LockFuture(cctx.ioPolicy());
Waiter waiter = waitMap.merge(key, fut, Waiter::concat);
- byte state = txLog.get(key);
-
- if ((state == TxState.ABORTED || state == TxState.COMMITTED)
- && !waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null)
+ if (!waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null)
waiter.run(ctx);
return fut;
}
/** {@inheritDoc} */
+ @Override public void releaseWaiters(MvccVersion locked) {
+ Waiter waiter = waitMap.remove(new TxKey(locked.coordinatorVersion(), locked.counter()));
+
+ if (waiter != null)
+ waiter.run(ctx);
+ }
+
+ /** {@inheritDoc} */
@Override public void addQueryTracker(MvccQueryTracker tracker) {
assert tracker.id() != MVCC_TRACKER_ID_NA;
- MvccQueryTracker tr = activeTrackers.put(tracker.id(), tracker);
-
- assert tr == null;
+ activeTrackers.putIfAbsent(tracker.id(), tracker);
}
/** {@inheritDoc} */
@@ -661,81 +668,73 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/** {@inheritDoc} */
- @Override public MvccSnapshot tryRequestSnapshotLocal() throws ClusterTopologyCheckedException {
- return tryRequestSnapshotLocal(null);
+ @Override public MvccSnapshot requestWriteSnapshotLocal() {
+ if (!currentCoordinator().local() || !initFut.isDone())
+ return null;
+
+ return assignTxSnapshot(0L, ctx.localNodeId(), false);
}
/** {@inheritDoc} */
- @Override public MvccSnapshot tryRequestSnapshotLocal(@Nullable IgniteInternalTx tx) throws ClusterTopologyCheckedException {
- MvccCoordinator crd = currentCoordinator();
+ @Override public MvccSnapshot requestReadSnapshotLocal() {
+ if (!currentCoordinator().local() || !initFut.isDone())
+ return null;
- if (crd == null)
- throw noCoordinatorError();
+ return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L);
+ }
- if (tx != null) {
- AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<MvccSnapshot> requestReadSnapshotAsync() {
+ MvccSnapshotFuture fut = new MvccSnapshotFuture();
- if (topVer != null && topVer.compareTo(crd.topologyVersion()) < 0)
- throw new ClusterTopologyCheckedException("Mvcc coordinator is outdated " +
- "for the locked topology version. [crd=" + crd + ", tx=" + tx + ']');
- }
+ requestReadSnapshotAsync(currentCoordinator(), fut);
- if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone())
- return null;
- else if (tx != null)
- return assignTxSnapshot(0L, ctx.localNodeId(), false);
- else
- return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L);
+ return fut;
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) {
+ @Override public IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync() {
MvccSnapshotFuture fut = new MvccSnapshotFuture();
- requestSnapshotAsync(tx, fut);
+ requestWriteSnapshotAsync(currentCoordinator(), fut);
return fut;
}
/** {@inheritDoc} */
- @Override public void requestSnapshotAsync(MvccSnapshotResponseListener lsnr) {
- requestSnapshotAsync(null, lsnr);
+ @Override public void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr) {
+ requestSnapshotAsync(crd, lsnr, true);
}
/** {@inheritDoc} */
- @Override public void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr) {
- MvccCoordinator crd = currentCoordinator();
+ @Override public void requestWriteSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr) {
+ requestSnapshotAsync(crd, lsnr, false);
+ }
- if (crd == null) {
+ /** */
+ private void requestSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr, boolean forRead) {
+ if (crd.disconnected()) {
lsnr.onError(noCoordinatorError());
return;
}
- if (tx != null) {
- AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
-
- if (topVer != null && topVer.compareTo(crd.topologyVersion()) < 0) {
- lsnr.onError(new ClusterTopologyCheckedException("Mvcc coordinator is outdated " +
- "for the locked topology version. [crd=" + crd + ", tx=" + tx + ']'));
-
- return;
- }
- }
-
if (ctx.localNodeId().equals(crd.nodeId())) {
if (!initFut.isDone()) {
// Wait for the local coordinator init.
initFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
- requestSnapshotAsync(tx, lsnr);
+ if (forRead)
+ lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L));
+ else
+ lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
}
});
}
- else if (tx != null)
- lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
- else
+ else if (forRead)
lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L));
+ else
+ lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
return;
}
@@ -753,7 +752,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
map.put(id, lsnr);
try {
- sendMessage(nodeId, tx != null ? new MvccTxSnapshotRequest(id) : new MvccQuerySnapshotRequest(id));
+ sendMessage(nodeId, forRead ? new MvccQuerySnapshotRequest(id) : new MvccTxSnapshotRequest(id));
}
catch (IgniteCheckedException e) {
if (map.remove(id) != null)
@@ -763,22 +762,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) {
- return ackTxCommit(updateVer, null, 0L);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot,
- long qryId) {
assert updateVer != null;
MvccCoordinator crd = curCrd;
- if (updateVer.coordinatorVersion() == crd.coordinatorVersion())
- return sendTxCommit(crd, createTxAckMessage(futIdCntr.incrementAndGet(), updateVer, readSnapshot, qryId));
- else if (readSnapshot != null)
- ackQueryDone(readSnapshot, qryId);
+ if (crd.disconnected() || crd.version() != updateVer.coordinatorVersion())
+ return new GridFinishedFuture<>();
- return new GridFinishedFuture<>();
+ return sendTxCommit(crd, new MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter()));
}
/** {@inheritDoc} */
@@ -787,39 +778,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
MvccCoordinator crd = curCrd;
- if (crd.coordinatorVersion() != updateVer.coordinatorVersion())
+ if (crd.disconnected() || crd.version() != updateVer.coordinatorVersion())
return;
- MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, null, 0L);
-
- msg.skipResponse(true);
-
- try {
- sendMessage(crd.nodeId(), msg);
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.nodeId() + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.nodeId() + ']', e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId) {
- assert updateVer != null;
-
- MvccCoordinator crd = curCrd;
-
- if (crd.coordinatorVersion() != updateVer.coordinatorVersion()) {
- if (readSnapshot != null)
- ackQueryDone(readSnapshot, qryTrackerId);
-
- return;
- }
-
- MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, readSnapshot, qryTrackerId);
+ MvccAckRequestTx msg = new MvccAckRequestTx((long)-1, updateVer.counter());
msg.skipResponse(true);
@@ -837,44 +799,20 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** {@inheritDoc} */
@Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) {
- assert snapshot != null;
-
MvccCoordinator crd = currentCoordinator();
- if (crd == null || crd.coordinatorVersion() == snapshot.coordinatorVersion()
- && sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot))))
+ if (crd.disconnected() || snapshot == null)
return;
- Message msg = new MvccAckRequestQueryId(qryId);
-
- do {
- crd = currentCoordinator();
- }
- while (!sendQueryDone(crd, msg));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) {
- assert crdId != null;
- assert txs != null && !txs.isEmpty();
-
- WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false);
-
- ackFuts.put(fut.id, fut);
+ if (crd.version() != snapshot.coordinatorVersion()
+ || !sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot)))) {
+ Message msg = new MvccAckRequestQueryId(qryId);
- try {
- sendMessage(crdId, new MvccWaitTxsRequest(fut.id, txs));
- }
- catch (IgniteCheckedException e) {
- if (ackFuts.remove(fut.id) != null) {
- if (e instanceof ClusterTopologyCheckedException)
- fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error.
- else
- fut.onDone(e);
+ do {
+ crd = currentCoordinator();
}
+ while (!sendQueryDone(crd, msg));
}
-
- return fut;
}
/** {@inheritDoc} */
@@ -941,13 +879,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/**
* Picks mvcc coordinator from the given list of nodes.
*
- * @param nodes List of nodes.
- * @param topVer Topology version.
* @return Chosen mvcc coordinator.
*/
- private MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, long topVer) {
- checkMvccSupported(nodes);
-
+ private @NotNull MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, AffinityTopologyVersion topVer) {
ClusterNode crdNode = null;
if (crdC != null) {
@@ -967,18 +901,17 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
}
- MvccCoordinator crd = crdNode != null ? new MvccCoordinator(crdNode.id(), coordinatorVersion(crdNode),
- new AffinityTopologyVersion(topVer, 0)) : null;
+ MvccCoordinator crd = crdNode != null ? new MvccCoordinator(topVer, crdNode.id(), coordinatorVersion(crdNode),
+ crdNode.isLocal()) : MvccCoordinator.DISCONNECTED_COORDINATOR;
- if (log.isInfoEnabled() && crd != null)
- log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']');
- else if (crd == null)
+ if (crd.disconnected())
U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']');
+ else if (log.isInfoEnabled())
+ log.info("Assigned mvcc coordinator [crd=" + crd + ']');
return crd;
}
-
/**
* @param crdNode Assigned coordinator node.
* @return Coordinator version.
@@ -1034,9 +967,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/** */
private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) {
- assert initFut.isDone();
- assert crdVer != 0;
- assert ctx.localNodeId().equals(currentCoordinatorId());
+ assert initFut.isDone() && curCrd.local();
MvccSnapshotResponse res = new MvccSnapshotResponse();
@@ -1068,7 +999,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : MVCC_COUNTER_NA;
- res.init(futId, crdVer, ver, MVCC_START_OP_CNTR, cleanup, tracking);
+ res.init(futId, curCrd.version(), ver, MVCC_START_OP_CNTR, cleanup, tracking);
return res;
}
@@ -1077,19 +1008,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
private void onTxDone(Long txCntr, boolean increaseCommittedCntr) {
assert initFut.isDone();
- GridFutureAdapter fut;
-
synchronized (this) {
activeTxs.remove(txCntr);
if (increaseCommittedCntr)
committedCntr.setIfGreater(txCntr);
}
-
- fut = waitTxFuts.remove(txCntr);
-
- if (fut != null)
- fut.onDone();
}
/**
@@ -1100,23 +1024,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
}
/**
- * @param futId Future ID.
- * @param updateVer Update version.
- * @param readSnapshot Optional read version.
- * @param qryTrackerId Query tracker id.
- * @return Message.
- */
- private MvccAckRequestTx createTxAckMessage(long futId, MvccVersion updateVer, MvccSnapshot readSnapshot,
- long qryTrackerId) {
- if (readSnapshot == null)
- return new MvccAckRequestTx(futId, updateVer.counter());
- else if (readSnapshot.coordinatorVersion() == updateVer.coordinatorVersion())
- return new MvccAckRequestTxAndQueryCntr(futId, updateVer.counter(), queryTrackCounter(readSnapshot));
- else
- return new MvccAckRequestTxAndQueryId(futId, updateVer.counter(), qryTrackerId);
- }
-
- /**
* @param mvccVer Read version.
* @return Tracker counter.
*/
@@ -1217,27 +1124,18 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
MvccCoordinator crd0 = currentCoordinator();
- if (Thread.currentThread().isInterrupted() ||
- crd0 == null ||
- crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId()))
+ if (!crd0.initialized() || Thread.currentThread().isInterrupted())
return new GridFinishedFuture<>(new VacuumMetrics());
final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>();
- MvccSnapshot snapshot;
-
- try {
- // TODO IGNITE-8974 create special method for getting cleanup version only.
- snapshot = tryRequestSnapshotLocal(DUMMY_TX);
- }
- catch (ClusterTopologyCheckedException e) {
- throw new AssertionError(e);
- }
+ // TODO IGNITE-8974 create special method for getting cleanup version only.
+ MvccSnapshot snapshot = requestWriteSnapshotLocal();
if (snapshot != null)
continueRunVacuum(res, snapshot);
else
- requestSnapshotAsync(DUMMY_TX, new MvccSnapshotResponseListener() {
+ requestWriteSnapshotAsync(crd0, new MvccSnapshotResponseListener() {
@Override public void onResponse(MvccSnapshot s) {
continueRunVacuum(res, s);
}
@@ -1327,23 +1225,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
try {
metrics = future.get();
- if (U.assertionsEnabled()) {
- MvccCoordinator crd = currentCoordinator();
-
- assert crd != null
- && crd.coordinatorVersion() >= snapshot.coordinatorVersion();
-
- for (TxKey key : waitMap.keySet()) {
- if (!( key.major() == snapshot.coordinatorVersion()
- && key.minor() > snapshot.cleanupVersion()
- || key.major() > snapshot.coordinatorVersion())) {
- byte state = state(key.major(), key.minor());
-
- assert state == TxState.ABORTED : "tx state=" + state;
- }
- }
- }
-
txLog.removeUntil(snapshot.coordinatorVersion(), snapshot.cleanupVersion());
if (log.isDebugEnabled())
@@ -1384,23 +1265,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
return new NodeStoppingException("Operation has been cancelled (node is stopping).");
}
- /**
- * @param nodeId Node ID.
- * @param msg Message.
- */
- private void sendFutureResponse(UUID nodeId, MvccWaitTxsRequest msg) {
- try {
- sendMessage(nodeId, new MvccFutureResponse(msg.futureId()));
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
- }
- }
-
/** */
@NotNull private IgniteInternalFuture<Void> sendTxCommit(MvccCoordinator crd, MvccAckRequestTx msg) {
WaitAckFuture fut = new WaitAckFuture(msg.futureId(), crd.nodeId(), true);
@@ -1431,8 +1295,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param msg Message.
* @return {@code True} if no need to resend the message to a new coordinator.
*/
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
private boolean sendQueryDone(MvccCoordinator crd, Message msg) {
- if (crd == null)
+ if (crd.disconnected())
return true; // no need to send ack;
try {
@@ -1447,7 +1312,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
MvccCoordinator crd0 = currentCoordinator();
// Coordinator is unassigned or still the same.
- return crd0 == null || crd.coordinatorVersion() == crd0.coordinatorVersion();
+ return crd0.disconnected() || crd.version() == crd0.version();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e);
@@ -1613,60 +1478,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param nodeId Node ID.
* @param msg Message.
*/
- @SuppressWarnings("unchecked")
- private void processCoordinatorWaitTxsRequest(final UUID nodeId, final MvccWaitTxsRequest msg) {
- GridLongList txs = msg.transactions();
-
- GridCompoundFuture resFut = null;
-
- for (int i = 0; i < txs.size(); i++) {
- Long txId = txs.get(i);
-
- GridFutureAdapter fut = waitTxFuts.get(txId);
-
- if (fut == null) {
- GridFutureAdapter old = waitTxFuts.putIfAbsent(txId, fut = new GridFutureAdapter());
-
- if (old != null)
- fut = old;
- }
-
- boolean isDone;
-
- synchronized (this) {
- isDone = !activeTxs.containsKey(txId);
- }
-
- if (isDone)
- fut.onDone();
-
- if (!fut.isDone()) {
- if (resFut == null)
- resFut = new GridCompoundFuture();
-
- resFut.add(fut);
- }
- }
-
- if (resFut != null)
- resFut.markInitialized();
-
- if (resFut == null || resFut.isDone())
- sendFutureResponse(nodeId, msg);
- else {
- resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
- @Override public void apply(IgniteInternalFuture fut) {
- sendFutureResponse(nodeId, msg);
- }
- });
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param msg Message.
- */
- private void processCoordinatorActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
+ private void processActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
}
@@ -1722,7 +1534,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (minQry == null)
minQry = tracking;
- res.init(futId, crdVer, ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, tracking);
+ res.init(futId, curCrd.version(), ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, tracking);
return res;
}
@@ -1827,7 +1639,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (msg0.waitForCoordinatorInit() && !initFut.isDone()) {
initFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> future) {
- assert crdVer != 0L;
+ assert curCrd.local();
processMessage(nodeId, msg);
}
@@ -1856,12 +1668,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
processCoordinatorQuerySnapshotRequest(nodeId, (MvccQuerySnapshotRequest)msg);
else if (msg instanceof MvccSnapshotResponse)
processCoordinatorSnapshotResponse(nodeId, (MvccSnapshotResponse)msg);
- else if (msg instanceof MvccWaitTxsRequest)
- processCoordinatorWaitTxsRequest(nodeId, (MvccWaitTxsRequest)msg);
else if (msg instanceof MvccAckRequestQueryId)
processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg);
else if (msg instanceof MvccActiveQueriesMessage)
- processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
+ processActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
else if (msg instanceof MvccRecoveryFinishedMessage)
processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg));
else
@@ -2377,10 +2187,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param row Mvcc row to check.
* @param snapshot Cleanup version to compare with.
* @param cctx Cache context.
- * @throws IgniteCheckedException If failed.
*/
- private boolean canClean(MvccDataRow row, MvccSnapshot snapshot,
- GridCacheContext cctx) throws IgniteCheckedException {
+ private boolean canClean(MvccDataRow row, MvccSnapshot snapshot, GridCacheContext cctx) {
// Row can be safely cleaned if it has ABORTED min version or COMMITTED and less than cleanup one max version.
return compare(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0
&& hasNewVersion(row) && MvccUtils.compareNewVersion(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index f143a43..c8ce98e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -21,9 +21,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
* Mvcc tracker.
@@ -63,36 +60,11 @@ public interface MvccQueryTracker {
public IgniteInternalFuture<MvccSnapshot> requestSnapshot();
/**
- * Requests version on coordinator.
- *
- * @param topVer Topology version.
- * @return Future to wait for result.
- */
- public IgniteInternalFuture<MvccSnapshot> requestSnapshot(@NotNull AffinityTopologyVersion topVer);
-
- /**
- * Requests version on coordinator.
- *
- * @param topVer Topology version.
- * @param lsnr Response listener.
- */
- public void requestSnapshot(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr);
-
- /**
* Marks tracker as done.
*/
public void onDone();
/**
- * Marks tracker as done.
- *
- * @param tx Transaction.
- * @param commit Commit flag.
- * @return Acknowledge future.
- */
- @Nullable public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit);
-
- /**
* Mvcc coordinator change callback.
*
* @param newCrd New mvcc coordinator.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
index d86f5ec..89a2300 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
@@ -24,11 +24,9 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.noCoordinatorError;
@@ -53,32 +51,20 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
private final long id;
/** */
- private MvccSnapshot snapshot;
+ private Object state;
/** */
private volatile AffinityTopologyVersion topVer;
/** */
- private final boolean canRemap;
-
- /** */
private boolean done;
/**
* @param cctx Cache context.
*/
public MvccQueryTrackerImpl(GridCacheContext cctx) {
- this(cctx, true);
- }
-
- /**
- * @param cctx Cache context.
- * @param canRemap {@code True} if tracker can remap on coordinator fail.
- */
- public MvccQueryTrackerImpl(GridCacheContext cctx, boolean canRemap) {
this.cctx = cctx;
this.id = ID_CNTR.incrementAndGet();
- this.canRemap = canRemap;
log = cctx.logger(getClass());
}
@@ -89,8 +75,22 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
}
/** {@inheritDoc} */
- @Override public synchronized MvccSnapshot snapshot() {
- return snapshot;
+ @Override public MvccSnapshot snapshot() {
+ Object state0;
+
+ synchronized (this) {
+ state0 = state;
+ }
+
+ return snapshot(state0);
+ }
+
+ /** */
+ private MvccSnapshot snapshot(Object state) {
+ if (state != null && state.getClass() == SnapshotFuture.class)
+ return ((SnapshotFuture)state).result();
+ else
+ return (MvccSnapshot)state;
}
/** {@inheritDoc} */
@@ -105,83 +105,49 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot() {
- MvccSnapshot snapshot; MvccSnapshotFuture fut;
-
- if ((snapshot = snapshot()) != null)
- return new GridFinishedFuture<>(snapshot);
+ SnapshotFuture fut;
- requestSnapshot0(cctx.shared().exchange().readyAffinityVersion(), fut = new MvccSnapshotFuture());
-
- return fut;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot(@NotNull AffinityTopologyVersion topVer) {
- MvccSnapshot snapshot; MvccSnapshotFuture fut;
-
- if ((snapshot = snapshot()) != null)
- return new GridFinishedFuture<>(snapshot);
+ synchronized (this) {
+ if (state == null)
+ state = fut = new SnapshotFuture();
+ else if (state.getClass() == SnapshotFuture.class)
+ return (IgniteInternalFuture<MvccSnapshot>)state;
+ else
+ return new GridFinishedFuture<>((MvccSnapshot)state);
+ }
- requestSnapshot0(topVer, fut = new MvccSnapshotFuture());
+ requestSnapshot0(cctx.shared().exchange().readyAffinityVersion(), fut);
return fut;
}
/** {@inheritDoc} */
- @Override public void requestSnapshot(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) {
- MvccSnapshot snapshot = snapshot();
-
- if (snapshot != null)
- lsnr.onResponse(snapshot);
- else
- requestSnapshot0(topVer, lsnr);
- }
-
- /** {@inheritDoc} */
@Override public void onDone() {
- if (!checkDone())
- return;
+ Object state0;
- MvccProcessor prc = cctx.shared().coordinators();
-
- MvccSnapshot snapshot = snapshot();
-
- if (snapshot != null) {
- prc.removeQueryTracker(id);
+ synchronized (this) {
+ if (done)
+ return;
- prc.ackQueryDone(snapshot, id);
+ state0 = state;
+ done = true;
}
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit) {
- MvccSnapshot snapshot = snapshot(), txSnapshot = tx.mvccSnapshot();
-
- if (!checkDone() || snapshot == null && txSnapshot == null)
- return commit ? new GridFinishedFuture<>() : null;
- MvccProcessor prc = cctx.shared().coordinators();
+ cctx.shared().coordinators().removeQueryTracker(id);
- if (snapshot != null)
- prc.removeQueryTracker(id);
-
- if (txSnapshot == null)
- prc.ackQueryDone(snapshot, id);
- else if (commit)
- return prc.ackTxCommit(txSnapshot, snapshot, id);
+ if (state0 != null && state0.getClass() == SnapshotFuture.class)
+ ((SnapshotFuture)state0).cancel();
else
- prc.ackTxRollback(txSnapshot, snapshot, id);
-
- return null;
+ ackQueryDone((MvccSnapshot)state0);
}
/** {@inheritDoc} */
- @Override public synchronized long onMvccCoordinatorChange(MvccCoordinator newCrd) {
- if (snapshot != null) {
+ @Override public synchronized long onMvccCoordinatorChange(@NotNull MvccCoordinator newCrd) {
+ if (snapshot(state) != null) {
assert crdVer != 0 : this;
- if (crdVer != newCrd.coordinatorVersion()) {
- crdVer = newCrd.coordinatorVersion();
+ if (crdVer != newCrd.version()) {
+ crdVer = newCrd.version();
return id;
}
@@ -195,141 +161,141 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
}
/** */
- private void requestSnapshot0(AffinityTopologyVersion topVer, MvccSnapshotResponseListener lsnr) {
- if (checkTopology(topVer, lsnr = decorate(lsnr))) {
- try {
- MvccSnapshot snapshot = cctx.shared().coordinators().tryRequestSnapshotLocal();
-
- if (snapshot == null)
- cctx.shared().coordinators().requestSnapshotAsync(lsnr);
- else
- lsnr.onResponse(snapshot);
- }
- catch (ClusterTopologyCheckedException e) {
- lsnr.onError(e);
+ private void requestSnapshot0(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) {
+ MvccCoordinator crd = coordinator(); AffinityTopologyVersion crdTopVer = crd.topologyVersion();
+
+ if (!crdTopVer.initialized())
+ lsnr.onError(noCoordinatorError());
+ else if (crdTopVer.compareTo(topVer) <= 0) {
+ synchronized (this) {
+ if (done)
+ return;
+
+ this.crdVer = crd.version();
}
- }
- }
- /** */
- private MvccSnapshotResponseListener decorate(MvccSnapshotResponseListener lsnr) {
- assert lsnr != null;
+ this.topVer = topVer;
- if (lsnr.getClass() == ListenerDecorator.class)
- return lsnr;
+ cctx.shared().coordinators().addQueryTracker(this);
- return new ListenerDecorator(lsnr);
+ cctx.shared().coordinators().requestReadSnapshotAsync(crd, lsnr);
+ }
+ else
+ remap(crdTopVer, lsnr);
}
/**
- * Validates if mvcc snapshot could be requested on the given topology.
- *
- * @return {@code True} if topology is valid.
+ * @param res Response.
+ * @param lsnr Response listener.
+ * @return {@code false} if need to remap.
*/
- private boolean checkTopology(AffinityTopologyVersion topVer, MvccSnapshotResponseListener lsnr) {
- MvccCoordinator crd = cctx.shared().coordinators().currentCoordinator();
-
- if (crd == null) {
- lsnr.onError(noCoordinatorError(topVer));
-
- return false;
- }
-
- this.topVer = topVer;
+ private boolean onResponse0(@NotNull MvccSnapshot res, @NotNull MvccSnapshotResponseListener lsnr) {
+ boolean ackQueryDone = false, needRemap = false;
synchronized (this) {
- crdVer = crd.coordinatorVersion();
- }
+ assert snapshot(state) == null : "[this=" + this + ", rcvdVer=" + res + "]";
- return true;
- }
+ if (!done && crdVer != 0) {
+ this.state = res;
- /** */
- private void tryRemap(MvccSnapshotResponseListener lsnr) {
- if (!canRemap) {
- lsnr.onError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator failed."));
+ return true;
+ }
- return;
+ if (crdVer != 0)
+ ackQueryDone = true;
+ else if (!done)
+ needRemap = true;
}
- IgniteInternalFuture<AffinityTopologyVersion> waitFut =
- cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion());
-
- if(log.isDebugEnabled())
- log.debug("Remap on new topology: " + waitFut);
-
- if (waitFut == null)
- requestSnapshot(cctx.shared().exchange().readyAffinityVersion(), lsnr);
- else {
- waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- try {
- requestSnapshot(fut.get(), lsnr);
- }
- catch (IgniteCheckedException e) {
- lsnr.onError(e);
- }
- }
- });
- }
+ if (needRemap) // Coordinator is failed or reassigned, need remap.
+ tryRemap(coordinator().topologyVersion(), lsnr);
+ else if (ackQueryDone) // Coordinator is not failed, but the tracker is already closed.
+ ackQueryDone(res);
+
+ return false;
}
/**
- * @param res Response.
+ * @param e Exception.
* @param lsnr Response listener.
* @return {@code false} if need to remap.
*/
- private boolean onResponse0(@NotNull MvccSnapshot res, MvccSnapshotResponseListener lsnr) {
- boolean needRemap = false;
-
+ private boolean onError0(IgniteCheckedException e, @NotNull MvccSnapshotResponseListener lsnr) {
synchronized (this) {
- assert snapshot() == null : "[this=" + this + ", rcvdVer=" + res + "]";
-
- if (crdVer != 0) {
- this.snapshot = res;
- }
- else
- needRemap = true;
+ if (done)
+ return false;
}
- if (needRemap) { // Coordinator failed or reassigned, need remap.
- tryRemap(lsnr);
+ if (e instanceof ClusterTopologyCheckedException
+ && !(e instanceof ClusterTopologyServerNotFoundException)) {
+ tryRemap(coordinator().topologyVersion(), lsnr);
return false;
}
- cctx.shared().coordinators().addQueryTracker(this);
+ cctx.shared().coordinators().removeQueryTracker(id);
return true;
}
- /**
- * @param e Exception.
- * @param lsnr Response listener.
- * @return {@code false} if need to remap.
- */
- private boolean onError0(IgniteCheckedException e, MvccSnapshotResponseListener lsnr) {
- if (e instanceof ClusterTopologyCheckedException && canRemap) {
- if (e instanceof ClusterTopologyServerNotFoundException)
- return true; // No Mvcc coordinator assigned
+ /** */
+ private void tryRemap(@NotNull AffinityTopologyVersion mapVer, @NotNull MvccSnapshotResponseListener lsnr) {
+ if (!mapVer.initialized())
+ lsnr.onError(noCoordinatorError());
+ else
+ remap(mapVer, lsnr);
+ }
- if (log.isDebugEnabled())
- log.debug("Mvcc coordinator failed, need remap: " + e);
+ /** */
+ private void remap(@NotNull AffinityTopologyVersion mapVer, @NotNull MvccSnapshotResponseListener lsnr) {
+ if (log.isDebugEnabled())
+ log.debug("Mvcc coordinator failed or reassigned, need remap.");
- tryRemap(lsnr);
+ AffinityTopologyVersion topVer0 = topVer;
- return false;
- }
+ if (topVer0 != null && topVer0.compareTo(mapVer) >= 0)
+ mapVer = topVer0.nextMinorVersion();
- return true;
+ // Topology version can grow only.
+ assert topVer0 == null || mapVer.compareTo(topVer0) > 0 :
+ "topVer=" + topVer0 + ", nextTopVer=" + mapVer;
+
+ IgniteInternalFuture<AffinityTopologyVersion> readyFut =
+ cctx.shared().exchange().affinityReadyFuture(mapVer);
+
+ assert readyFut != null; // Cannot be null.
+
+ if (readyFut.isDone())
+ onAffinityReady(readyFut, lsnr);
+ else
+ readyFut.listen(fut -> onAffinityReady(fut, lsnr));
}
/** */
- private synchronized boolean checkDone() {
- if (!done)
- return done = true;
+ private void onAffinityReady(@NotNull IgniteInternalFuture<AffinityTopologyVersion> readyFut,
+ @NotNull MvccSnapshotResponseListener lsnr) {
+ try {
+ AffinityTopologyVersion mapVer = readyFut.get();
- return false;
+ if (log.isDebugEnabled())
+ log.debug("Remap on new topology: " + mapVer);
+
+ requestSnapshot0(mapVer, lsnr);
+ }
+ catch (IgniteCheckedException e) {
+ lsnr.onError(e);
+ }
+ }
+
+ /** */
+ private void ackQueryDone(MvccSnapshot snapshot) {
+ if (snapshot != null)
+ cctx.shared().coordinators().ackQueryDone(snapshot, id);
+ }
+
+ /** */
+ @NotNull private MvccCoordinator coordinator() {
+ return cctx.shared().coordinators().currentCoordinator();
}
/** {@inheritDoc} */
@@ -338,23 +304,25 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
}
/** */
- private final class ListenerDecorator implements MvccSnapshotResponseListener {
- /** */
- private final MvccSnapshotResponseListener lsnr;
-
+ private final class SnapshotFuture extends MvccSnapshotFuture {
/** */
- private ListenerDecorator(MvccSnapshotResponseListener lsnr) {
- this.lsnr = lsnr;
- }
+ private SnapshotFuture() {}
+ /** {@inheritDoc} */
@Override public void onResponse(MvccSnapshot res) {
if (onResponse0(res, this))
- lsnr.onResponse(res);
+ super.onResponse(res);
}
+ /** {@inheritDoc} */
@Override public void onError(IgniteCheckedException e) {
if (onError0(e, this))
- lsnr.onError(e);
+ super.onError(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ return onCancelled();
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index c6848d3..73e9bba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -23,7 +23,6 @@ import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -137,10 +136,10 @@ public class MvccUtils {
if (isVisible(cctx, snapshot, mvccCrd, mvccCntr, MVCC_OP_COUNTER_NA, false))
return false;
- byte state = state(cctx, mvccCrd, mvccCntr, 0);
+ byte state;
- return state != TxState.COMMITTED && state != TxState.ABORTED
- || cctx.kernalContext().coordinators().hasLocalTransaction(mvccCrd, mvccCntr);
+ return cctx.kernalContext().coordinators().hasLocalTransaction(mvccCrd, mvccCntr) ||
+ (state = state(cctx, mvccCrd, mvccCntr, 0)) != TxState.COMMITTED && state != TxState.ABORTED;
}
/**
@@ -150,9 +149,8 @@ public class MvccUtils {
* @param mvccOpCntr Mvcc operation counter.
* @return TxState
* @see TxState
- * @throws IgniteCheckedException If failed.
*/
- public static byte state(GridCacheContext cctx, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException {
+ public static byte state(GridCacheContext cctx, long mvccCrd, long mvccCntr, int mvccOpCntr) {
return state(cctx.kernalContext().coordinators(), mvccCrd, mvccCntr, mvccOpCntr);
}
@@ -163,9 +161,8 @@ public class MvccUtils {
* @param mvccOpCntr Mvcc operation counter.
* @return TxState
* @see TxState
- * @throws IgniteCheckedException If failed.
*/
- public static byte state(CacheGroupContext grp, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException {
+ public static byte state(CacheGroupContext grp, long mvccCrd, long mvccCntr, int mvccOpCntr) {
return state(grp.shared().coordinators(), mvccCrd, mvccCntr, mvccOpCntr);
}
@@ -175,20 +172,21 @@ public class MvccUtils {
* @param mvccCntr Mvcc counter.
* @return TxState
* @see TxState
- * @throws IgniteCheckedException If failed.
*/
- private static byte state(MvccProcessor proc, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException {
+ private static byte state(MvccProcessor proc, long mvccCrd, long mvccCntr, int mvccOpCntr) {
if (compare(INITIAL_VERSION, mvccCrd, mvccCntr, mvccOpCntr) == 0)
return TxState.COMMITTED; // Initial version is always committed;
if ((mvccOpCntr & MVCC_HINTS_MASK) != 0)
return (byte)(mvccOpCntr >>> MVCC_HINTS_BIT_OFF);
+ MvccCoordinator crd = proc.currentCoordinator();
+
byte state = proc.state(mvccCrd, mvccCntr);
if ((state == TxState.NA || state == TxState.PREPARED)
- && (proc.currentCoordinator() == null // Recovery from WAL.
- || mvccCrd < proc.currentCoordinator().coordinatorVersion()))
+ && (crd.unassigned() // Recovery from WAL.
+ || (crd.initialized() && mvccCrd < crd.version()))) // Stale row.
state = TxState.ABORTED;
return state;
@@ -242,9 +240,18 @@ public class MvccUtils {
if (mvccCrd > snapshotCrd)
return false; // Rows in the future are never visible.
- if (mvccCrd < snapshotCrd)
- // Don't check the row with TxLog if the row is expected to be committed.
- return !useTxLog || isCommitted(cctx, mvccCrd, mvccCntr, opCntr);
+ if (mvccCrd < snapshotCrd) {
+ if (!useTxLog)
+ return true; // The checking row is expected to be committed.
+
+ byte state = state(cctx, mvccCrd, mvccCntr, opCntr);
+
+ if (MVCC_MAX_SNAPSHOT.compareTo(snapshot) != 0 // Special version which sees all committed entries.
+ && state != TxState.COMMITTED && state != TxState.ABORTED)
+ throw unexpectedStateException(cctx, state, mvccCrd, mvccCntr, opCntr, snapshot);
+
+ return state == TxState.COMMITTED;
+ }
if (mvccCntr > snapshotCntr) // we don't see future updates
return false;
@@ -543,15 +550,6 @@ public class MvccUtils {
}
/**
- * @param topVer Topology version for cache operation.
- * @return Error.
- */
- public static ClusterTopologyServerNotFoundException noCoordinatorError(AffinityTopologyVersion topVer) {
- return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned for " +
- "topology version: " + topVer);
- }
-
- /**
* @return Error.
*/
public static ClusterTopologyServerNotFoundException noCoordinatorError() {
@@ -627,18 +625,6 @@ public class MvccUtils {
}
/**
- *
- * @param cctx Cache context.
- * @param mvccCrd Coordinator version.
- * @param mvccCntr Counter.
- * @return {@code True} in case the corresponding transaction is in {@code TxState.COMMITTED} state.
- * @throws IgniteCheckedException If failed.
- */
- private static boolean isCommitted(GridCacheContext cctx, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException {
- return state(cctx, mvccCrd, mvccCntr, mvccOpCntr) == TxState.COMMITTED;
- }
-
- /**
* Throw an {@link UnsupportedOperationException} if this cache is transactional and MVCC is enabled with
* appropriate message about corresponding operation type.
* @param cctx Cache context.
@@ -809,7 +795,7 @@ public class MvccUtils {
if (tx == null)
tracker = new MvccQueryTrackerImpl(cctx);
- else if ((tracker = tx.mvccQueryTracker()) == null)
+ else
tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, tx));
if (tracker.snapshot() == null)
@@ -827,6 +813,8 @@ public class MvccUtils {
*/
public static MvccSnapshot requestSnapshot(GridCacheContext cctx,
GridNearTxLocal tx) throws IgniteCheckedException {
+ assert tx != null;
+
MvccSnapshot snapshot;
tx = checkActive(tx);
@@ -834,11 +822,10 @@ public class MvccUtils {
if ((snapshot = tx.mvccSnapshot()) == null) {
MvccProcessor prc = cctx.shared().coordinators();
- snapshot = prc.tryRequestSnapshotLocal(tx);
+ snapshot = prc.requestWriteSnapshotLocal();
if (snapshot == null)
- // TODO IGNITE-7388
- snapshot = prc.requestSnapshotAsync(tx).get();
+ snapshot = prc.requestWriteSnapshotAsync().get();
tx.mvccSnapshot(snapshot);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
index 52fb1db..95a1664 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.processors.cache.mvcc;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.jetbrains.annotations.NotNull;
/**
* Simple MVCC tracker used only as an Mvcc snapshot holder.
@@ -65,26 +63,11 @@ public class StaticMvccQueryTracker implements MvccQueryTracker {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot(@NotNull final AffinityTopologyVersion topVer) {
- return new GridFinishedFuture<>(snapshot);
- }
-
- /** {@inheritDoc} */
- @Override public void requestSnapshot(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) {
- lsnr.onResponse(snapshot);
- }
-
- /** {@inheritDoc} */
@Override public void onDone() {
// No-op.
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit) {
- throw new UnsupportedOperationException("Operation is not supported.");
- }
-
- /** {@inheritDoc} */
@Override public long onMvccCoordinatorChange(MvccCoordinator newCrd) {
return MVCC_TRACKER_ID_NA;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
deleted file mode 100644
index 5c6d4aa..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc.msg;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class MvccWaitTxsRequest implements MvccMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long futId;
-
- /** */
- private GridLongList txs;
-
- /**
- *
- */
- public MvccWaitTxsRequest() {
- // No-op.
- }
-
- /**
- * @param futId Future ID.
- * @param txs Transactions to wait for.
- */
- public MvccWaitTxsRequest(long futId, GridLongList txs) {
- assert txs != null && !txs.isEmpty() : txs;
-
- this.futId = futId;
- this.txs = txs;
- }
-
- /**
- * @return Future ID.
- */
- public long futureId() {
- return futId;
- }
-
- /**
- * @return Transactions to wait for.
- */
- public GridLongList transactions() {
- return txs;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeMessage("txs", txs))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- txs = reader.readMessage("txs");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(MvccWaitTxsRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 142;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MvccWaitTxsRequest.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index ce36bab..51410ef 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2357,9 +2357,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
finally {
checkpointReadUnlock();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 9d11e76..9d4311a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2058,20 +2058,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public GridLongList mvccUpdateNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- CacheDataStore delegate = init0(false);
-
- return delegate.mvccUpdateNative(cctx, primary, key, val, ver, expireTime, mvccSnapshot);
- }
-
- /** {@inheritDoc} */
- @Override public GridLongList mvccRemoveNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- CacheDataStore delegate = init0(false);
-
- return delegate.mvccRemoveNative(cctx, primary, key, mvccSnapshot);
- }
-
- /** {@inheritDoc} */
@Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b392feb..131bd72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1226,21 +1226,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
seal();
if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) {
- if (state == PREPARED) {
- try {
- cctx.tm().mvccPrepare(this);
- }
- catch (IgniteCheckedException e) {
- String msg = "Failed to update TxState: " + TxState.PREPARED;
-
- U.error(log, msg, e);
-
- throw new IgniteException(msg, e);
- }
- }
+ cctx.tm().setMvccState(this, toMvccState(state));
- if (!txState().mvccEnabled())
- ptr = cctx.tm().logTxRecord(this);
+ ptr = cctx.tm().logTxRecord(this);
}
}
}
@@ -1270,6 +1258,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** */
+ private byte toMvccState(TransactionState state) {
+ switch (state) {
+ case PREPARED:
+ return TxState.PREPARED;
+ case COMMITTED:
+ return TxState.COMMITTED;
+ case ROLLED_BACK:
+ return TxState.ABORTED;
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ }
+
+ /** */
private void recordStateChangedEvent(TransactionState state){
if (!near() || !local()) // Covers only GridNearTxLocal's state changes.
return;
@@ -1676,8 +1678,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/*closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
resolveTaskName(),
null,
- keepBinary,
- null); // TODO IGNITE-7371
+ keepBinary);
}
boolean modified = false;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9b9c7cc..b9a6697 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1777,8 +1777,7 @@ public class IgniteTxHandler {
/*transformClo*/null,
tx.resolveTaskName(),
/*expiryPlc*/null,
- /*keepBinary*/true,
- null); // TODO IGNITE-7371
+ /*keepBinary*/true);
if (val == null)
val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0d78017..26350b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -68,7 +67,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -157,10 +155,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
protected CacheWriteSynchronizationMode syncMode;
/** */
- private GridLongList mvccWaitTxs;
-
- /** */
- private volatile boolean qryEnlisted;
+ protected volatile boolean qryEnlisted;
/**
* Empty constructor required for {@link Externalizable}.
@@ -222,10 +217,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
}
- public GridLongList mvccWaitTransactions() {
- return mvccWaitTxs;
- }
-
/**
* @return Transaction write synchronization mode.
*/
@@ -573,8 +564,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
cctx.tm().addCommittedTx(this);
if (!empty) {
- assert mvccWaitTxs == null;
-
batchStoreCommit(writeEntries());
WALPointer ptr = null;
@@ -747,17 +736,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null,
- mvccSnapshot());
+ null);
- if (updRes.success()) {
+ if (updRes.success())
txEntry.updateCounter(updRes.updateCounter());
- GridLongList waitTxs = updRes.mvccWaitTransactions();
-
- updateWaitTxs(waitTxs);
- }
-
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
@@ -787,8 +770,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer0,
- null,
- mvccSnapshot())
+ null)
);
}
}
@@ -810,17 +792,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer,
- null,
- mvccSnapshot());
+ null);
- if (updRes.success()) {
+ if (updRes.success())
txEntry.updateCounter(updRes.updateCounter());
- GridLongList waitTxs = updRes.mvccWaitTransactions();
-
- updateWaitTxs(waitTxs);
- }
-
if (updRes.loggedPointer() != null)
ptr = updRes.loggedPointer();
@@ -845,8 +821,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CU.subjectId(this, cctx),
resolveTaskName(),
dhtVer0,
- null,
- mvccSnapshot())
+ null)
);
}
}
@@ -979,18 +954,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param waitTxs Tx ids to wait for.
- */
- private void updateWaitTxs(@Nullable GridLongList waitTxs) {
- if (waitTxs != null) {
- if (this.mvccWaitTxs == null)
- this.mvccWaitTxs = waitTxs;
- else
- this.mvccWaitTxs.addAll(waitTxs);
- }
- }
-
- /**
* Safely performs {@code updateClojure} operation on near cache entry with given {@code entryKey}.
* In case of {@link GridCacheEntryRemovedException} operation will be retried.
*
@@ -1225,8 +1188,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
resolveTaskName(),
null,
- txEntry.keepBinary(),
- null); // TODO IGNITE-7371
+ txEntry.keepBinary());
}
}
else {
@@ -1682,19 +1644,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param ver Mvcc version.
+ * Marks that there are entries, enlisted by query.
*/
- public void markQueryEnlisted(MvccSnapshot ver) {
- if (!qryEnlisted) {
- assert ver != null || mvccSnapshot != null;
+ public void markQueryEnlisted() {
+ assert mvccSnapshot != null && txState.mvccEnabled();
- if (mvccSnapshot == null)
- mvccSnapshot = ver;
+ if (!qryEnlisted) {
+ qryEnlisted = true;
- if(dht())
+ if (!cctx.localNode().isClient())
cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter());
-
- qryEnlisted = true;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 6c11cbc..e8d1a3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOpti
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
-import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -319,16 +318,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Rollback all active transactions with acquired Mvcc snapshot.
- */
- public void rollbackMvccTxOnCoordinatorChange() {
- for (IgniteInternalTx tx : activeTransactions()) {
- if (tx.mvccSnapshot() != null && tx instanceof GridNearTxLocal)
- ((GridNearTxLocal)tx).rollbackNearTxLocalAsync(false, false);
- }
- }
-
- /**
* @param cacheId Cache ID.
* @param txMap Transactions map.
*/
@@ -1443,6 +1432,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Removes Tx from manager. Can be used only if there were no updates.
+ *
+ * @param tx Transaction to finish.
+ */
+ public void forgetTx(IgniteInternalTx tx) {
+ assert tx != null;
+
+ if (transactionMap(tx).remove(tx.xidVersion(), tx)) {
+ // 1. Remove from per-thread storage.
+ clearThreadMap(tx);
+
+ // 2. Unregister explicit locks.
+ if (!tx.alternateVersions().isEmpty())
+ for (GridCacheVersion ver : tx.alternateVersions())
+ idMap.remove(ver);
+
+ // 3. Remove Near-2-DHT mappings.
+ if (tx instanceof GridCacheMappedVersion)
+ mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
+
+ // 4. Clear context.
+ resetContext();
+
+ // 5. Complete finish future.
+ tx.state(UNKNOWN);
+ }
+ }
+
+ /**
* Tries to minimize damage from partially-committed transaction.
*
* @param tx Tx to uncommit.
@@ -2416,55 +2434,34 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * Marks MVCC transaction as {@link TxState#COMMITTED} or {@link TxState#ABORTED}.
+ * Sets MVCC state.
*
* @param tx Transaction.
- * @param commit Commit flag.
- * @throws IgniteCheckedException If failed to add version to TxLog.
+ * @param state New state.
*/
- public void mvccFinish(IgniteTxAdapter tx, boolean commit) throws IgniteCheckedException {
- if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && !(tx.near() && tx.remote())) {
- WALPointer ptr = null;
-
- cctx.database().checkpointReadLock();
-
- try {
- TxRecord rec;
- if (cctx.wal() != null && (rec = newTxRecord(tx)) != null)
- cctx.wal().log(rec);
+ public void setMvccState(IgniteInternalTx tx, byte state) {
+ if (cctx.kernalContext().clientNode() || tx.mvccSnapshot() == null || tx.near() && !tx.local())
+ return;
- cctx.coordinators().updateState(tx.mvccSnapshot, commit ? TxState.COMMITTED : TxState.ABORTED, tx.local());
- }
- finally {
- cctx.database().checkpointReadUnlock();
- }
+ cctx.database().checkpointReadLock();
- if (ptr != null)
- cctx.wal().flush(ptr, true);
+ try {
+ cctx.coordinators().updateState(tx.mvccSnapshot(), state, tx.local());
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
}
}
/**
- * Marks MVCC transaction as {@link TxState#PREPARED}.
- *
- * @param tx Transaction.
- * @throws IgniteCheckedException If failed to add version to TxLog.
+ * Finishes MVCC transaction.
+ * @param tx Transaction.
*/
- public void mvccPrepare(IgniteTxAdapter tx) throws IgniteCheckedException {
- if (!cctx.kernalContext().clientNode() && tx.mvccSnapshot != null && !(tx.near() && tx.remote())) {
- cctx.database().checkpointReadLock();
-
- try {
- TxRecord rec;
- if (cctx.wal() != null && (rec = newTxRecord(tx)) != null)
- cctx.wal().log(rec);
+ public void mvccFinish(IgniteTxAdapter tx) {
+ if (cctx.kernalContext().clientNode() || tx.mvccSnapshot == null || !tx.local())
+ return;
- cctx.coordinators().updateState(tx.mvccSnapshot, TxState.PREPARED);
- }
- finally {
- cctx.database().checkpointReadUnlock();
- }
- }
+ cctx.coordinators().releaseWaiters(tx.mvccSnapshot);
}
/**
@@ -2474,44 +2471,32 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @return WALPointer or {@code null} if nothing was logged.
*/
@Nullable WALPointer logTxRecord(IgniteTxAdapter tx) {
- // Log tx state change to WAL.
- if (cctx.wal() != null && logTxRecords) {
- TxRecord txRecord = newTxRecord(tx);
-
- if (txRecord != null) {
- try {
- return cctx.wal().log(txRecord);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to log TxRecord: " + txRecord, e);
+ BaselineTopology baselineTop;
- throw new IgniteException("Failed to log TxRecord: " + txRecord, e);
- }
- }
- }
+ // Log tx state change to WAL.
+ if (cctx.wal() == null
+ || (!logTxRecords && !tx.txState().mvccEnabled())
+ || (baselineTop = cctx.kernalContext().state().clusterState().baselineTopology()) == null
+ || !baselineTop.consistentIds().contains(cctx.localNode().consistentId()))
+ return null;
- return null;
- }
+ Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
- /**
- * Creates Tx state record for WAL.
- *
- * @param tx Transaction.
- * @return Tx state record.
- */
- private @Nullable TxRecord newTxRecord(IgniteTxAdapter tx) {
- BaselineTopology baselineTop = cctx.kernalContext().state().clusterState().baselineTopology();
+ TxRecord record;
- if (baselineTop != null && baselineTop.consistentIds().contains(cctx.localNode().consistentId())) {
- Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop);
+ if (tx.txState().mvccEnabled())
+ record = new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
+ else
+ record = new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
- if (tx.txState().mvccEnabled())
- return new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot());
- else
- return new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes);
+ try {
+ return cctx.wal().log(record);
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to log TxRecord: " + record, e);
- return null;
+ throw new IgniteException("Failed to log TxRecord: " + record, e);
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRowNative.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRowNative.java
deleted file mode 100644
index 38611a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRowNative.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.tree.mvcc.data;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.tree.RowLinkIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class MvccUpdateDataRowNative extends MvccDataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
- /** */
- private final MvccSnapshot mvccSnapshot;
- /** */
- private ResultType res;
- /** */
- private boolean canCleanup;
- /** */
- private GridLongList activeTxs;
- /** */
- private List<MvccLinkAwareSearchRow> cleanupRows;
- /** */
- private CacheDataRow oldRow;
-
- /**
- * @param key Key.
- * @param val Value.
- * @param ver Version.
- * @param expireTime Expire time.
- * @param mvccSnapshot MVCC snapshot.
- * @param newVer Update version.
- * @param part Partition.
- * @param cctx Cache context.
- */
- public MvccUpdateDataRowNative(
- KeyCacheObject key,
- CacheObject val,
- GridCacheVersion ver,
- long expireTime,
- MvccSnapshot mvccSnapshot,
- MvccVersion newVer,
- int part,
- GridCacheContext cctx) {
- super(key,
- val,
- ver,
- part,
- expireTime,
- cctx.cacheId(),
- mvccSnapshot,
- newVer);
-
- this.mvccSnapshot = mvccSnapshot;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
- BPlusIO<CacheSearchRow> io,
- long pageAddr,
- int idx)
- throws IgniteCheckedException {
- RowLinkIO rowIo = (RowLinkIO)io;
-
- // Assert version grows.
- assert assertVersion(rowIo, pageAddr, idx);
-
- boolean checkActive = mvccSnapshot.activeTransactions().size() > 0;
-
- boolean txActive = false;
-
- long rowCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
-
- long crdVer = mvccCoordinatorVersion();
-
- boolean isFirstRmvd = false;
-
- if (res == null) {
- int cmp = Long.compare(crdVer, rowCrdVer);
-
- if (cmp == 0)
- cmp = Long.compare(mvccSnapshot.counter(), rowIo.getMvccCounter(pageAddr, idx));
-
- if (cmp == 0)
- res = ResultType.VERSION_FOUND;
- else {
- oldRow = tree.getRow(io, pageAddr, idx, RowData.LINK_WITH_HEADER);
-
- isFirstRmvd = oldRow.newMvccCoordinatorVersion() != 0;
-
- if (isFirstRmvd)
- res = ResultType.PREV_NULL;
- else
- res = ResultType.PREV_NOT_NULL;
- }
- }
-
- // Suppose transactions on previous coordinator versions are done.
- if (checkActive && crdVer == rowCrdVer) {
- long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
-
- long activeTx = isFirstRmvd ? oldRow.newMvccCounter() : rowMvccCntr;
-
- if (mvccSnapshot.activeTransactions().contains(activeTx)) {
- txActive = true;
-
- if (activeTxs == null)
- activeTxs = new GridLongList();
-
- activeTxs.add(activeTx);
- }
- }
-
- if (!txActive) {
- assert Long.compare(crdVer, rowCrdVer) >= 0;
-
- int cmp;
-
- long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
-
- if (crdVer == rowCrdVer)
- cmp = Long.compare(mvccSnapshot.cleanupVersion(), rowCntr);
- else
- cmp = 1;
-
- if (cmp >= 0) {
- // Do not cleanup oldest version.
- if (canCleanup) {
- assert MvccUtils.mvccVersionIsValid(rowCrdVer, rowCntr);
-
- // Should not be possible to cleanup active tx.
- assert rowCrdVer != crdVer || !mvccSnapshot.activeTransactions().contains(rowCntr);
-
- if (cleanupRows == null)
- cleanupRows = new ArrayList<>();
-
- cleanupRows.add(new MvccLinkAwareSearchRow(cacheId, key, rowCrdVer, rowCntr,
- rowIo.getMvccOperationCounter(pageAddr, idx), rowIo.getLink(pageAddr, idx)));
- }
- else
- canCleanup = true;
- }
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public int mvccOperationCounter() {
- return MvccUtils.MVCC_START_OP_CNTR;
- }
-
- /**
- * @return Old row.
- */
- public CacheDataRow oldRow() {
- return oldRow;
- }
-
- /**
- * @return {@code True} if previous value was non-null.
- */
- public ResultType resultType() {
- return res == null ? ResultType.PREV_NULL : res;
- }
-
- /**
- * @return Active transactions to wait for.
- */
- @Nullable public GridLongList activeTransactions() {
- return activeTxs;
- }
-
- /**
- * @return Rows which are safe to cleanup.
- */
- public List<MvccLinkAwareSearchRow> cleanupRows() {
- return cleanupRows;
- }
-
- /**
- * @param io IO.
- * @param pageAddr Page address.
- * @param idx Item index.
- * @return Always {@code true}.
- */
- private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
- long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
- long rowCntr = io.getMvccCounter(pageAddr, idx);
-
- int cmp = Long.compare(mvccCoordinatorVersion(), rowCrdVer);
-
- if (cmp == 0)
- cmp = Long.compare(mvccSnapshot.counter(), rowCntr);
-
- // Can be equals if execute update on backup and backup already rebalanced value updated on primary.
- assert cmp >= 0 : "[updCrd=" + mvccCoordinatorVersion() +
- ", updCntr=" + mvccSnapshot.counter() +
- ", rowCrd=" + rowCrdVer +
- ", rowCntr=" + rowCntr + ']';
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MvccUpdateDataRowNative.class, this, "super", super.toString());
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6e64011..834d1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -408,8 +408,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
Object transformClo,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean keepBinary,
- MvccSnapshot mvccVer) {
+ boolean keepBinary) {
return val;
}
@@ -426,7 +425,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- MvccSnapshot mvccVer,
@Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert false;
@@ -444,7 +442,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary,
- MvccSnapshot mvccVer,
@Nullable ReaderArguments readerArgs) {
assert false;
@@ -477,8 +474,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- MvccSnapshot mvccVer
+ @Nullable Long updateCntr
)
throws IgniteCheckedException, GridCacheEntryRemovedException
{
@@ -592,9 +588,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
UUID subjId,
String taskName,
@Nullable GridCacheVersion dhtVer,
- @Nullable Long updateCntr,
- MvccSnapshot mvccVer
- ) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ @Nullable Long updateCntr
+ ) throws IgniteCheckedException, GridCacheEntryRemovedException {
obsoleteVer = ver;
val = null;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index aba526e..283d126 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -286,7 +286,6 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testCreateDestroyCachesMvccTxReplicated() throws Exception {
createDestroyCaches(REPLICATED, TRANSACTIONAL_SNAPSHOT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
index c7bee9d..02cd48c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
@@ -36,7 +36,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxGet_Server_Backups0_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -48,7 +47,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxGet_SingleNode_CoordinatorFails() throws Exception {
accountsTxReadAll(1, 0, 0, 1,
@@ -100,7 +98,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_Servers_Backups0__PutGet_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -112,7 +109,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_SingleNode__PutGet_CoordinatorFails() throws Exception {
updateNObjectsTest(7, 1, 0, 0, 1, DFLT_TEST_TIME,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedCoordinatorFailoverTest.java
index dcaf720..0780b00 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedCoordinatorFailoverTest.java
@@ -46,7 +46,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxGet_ClientServer_Backups2_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -58,7 +57,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxGet_Server_Backups1_CoordinatorFails() throws Exception {
accountsTxReadAll(2, 0, 1, DFLT_PARTITION_COUNT,
@@ -68,7 +66,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10750")
@Test
public void testAccountsTxScan_ClientServer_Backups2_CoordinatorFails() throws Exception {
accountsTxReadAll(4, 2, 2, DFLT_PARTITION_COUNT,
@@ -78,7 +75,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxScan_Server_Backups1_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -110,7 +106,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_ClientServer_Backups1_PutGet_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -133,7 +128,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testGetReadInProgressCoordinatorFails() throws Exception {
readInProgressCoordinatorFails(false, false, PESSIMISTIC, REPEATABLE_READ, GET, PUT, null);
@@ -158,7 +152,6 @@ public class CacheMvccPartitionedCoordinatorFailoverTest extends CacheMvccAbstra
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testGetReadInsideTxInProgressCoordinatorFails_ReadDelay() throws Exception {
readInProgressCoordinatorFails(true, true, PESSIMISTIC, REPEATABLE_READ, GET, PUT, null);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index acb7891..9e115be 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -73,7 +73,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryC
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -2536,7 +2535,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testMvccCoordinatorChangeSimple() throws Exception {
Ignite srv0 = startGrid(0);
@@ -3303,7 +3301,9 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
MvccProcessorImpl crd = mvccProcessor(node);
// Start query to prevent cleanup.
- IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync((IgniteInternalTx)null);
+ MvccSnapshotFuture fut = new MvccSnapshotFuture();
+
+ crd.requestReadSnapshotAsync(crd.currentCoordinator(), fut);
fut.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
index 6d37b9b..4836d3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
@@ -77,7 +77,6 @@ public class DataStreamProcessorMvccSelfTest extends DataStreamProcessorSelfTest
}
/** {@inheritDoc} */
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
@Override public void testTryFlush() throws Exception {
super.testTryFlush();
diff --git a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteNativeIoLocalWalModeChangeDuringRebalancingSelfTest.java b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteNativeIoLocalWalModeChangeDuringRebalancingSelfTest.java
index f429c50..a92f6d1 100644
--- a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteNativeIoLocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteNativeIoLocalWalModeChangeDuringRebalancingSelfTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.internal.processors.cache.persistence;
-import org.apache.ignite.testframework.MvccFeatureChecker;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,8 +34,7 @@ public class IgniteNativeIoLocalWalModeChangeDuringRebalancingSelfTest extends L
/** {@inheritDoc} */
@Test
@Override public void testWithExchangesMerge() throws Exception {
- if (MvccFeatureChecker.forcedMvcc())
- fail("https://issues.apache.org/jira/browse/IGNITE-10752");
+
super.testWithExchangesMerge();
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
index 7f22107..375789f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
@@ -51,7 +51,7 @@ public class DhtResultSetEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture
public DhtResultSetEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer,
MvccSnapshot mvccSnapshot, long threadId, IgniteUuid nearFutId, int nearMiniId, @Nullable int[] parts,
GridDhtTxLocalAdapter tx, long timeout, GridCacheContext<?, ?> cctx, ResultSet rs) {
- super(nearNodeId, nearLockVer, mvccSnapshot, threadId, nearFutId, nearMiniId, parts, tx, timeout, cctx);
+ super(nearNodeId, nearLockVer, mvccSnapshot, threadId, nearFutId, nearMiniId, tx, timeout, cctx);
this.rs = rs;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 907152a..4c3b2ce 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -58,12 +58,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -75,9 +75,9 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
import org.apache.ignite.internal.processors.query.h2.UpdateResult;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -111,10 +111,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUE
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
/**
@@ -605,7 +605,8 @@ public class GridMapQueryExecutor {
txReq.threadId(),
txReq.timeout(),
txReq.subjectId(),
- txReq.taskNameHash());
+ txReq.taskNameHash(),
+ req.mvccSnapshot());
}
else {
tx = MvccUtils.tx(ctx, txReq.version());
@@ -984,7 +985,7 @@ public class GridMapQueryExecutor {
if (inTx) {
if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) {
if (removeMapping = tx.empty() && !tx.queryEnlisted())
- tx.rollbackAsync().get();
+ ctx.cache().context().tm().forgetTx(tx);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
index e59018e..2ab9c62 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
@@ -116,7 +116,6 @@ public abstract class CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_ClientServer_Backups0_Sql_Persistence() throws Exception {
persistence = true;
@@ -128,7 +127,6 @@ public abstract class CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_SingleNode_Sql_Persistence() throws Exception {
updateNObjectsTest(3, 1, 0, 0, 1, DFLT_TEST_TIME,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
index ec62221..f201aae 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
@@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.mvcc;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
@@ -33,14 +32,12 @@ public class CacheMvccContinuousQueryClientReconnectTest extends IgniteCacheCon
}
/** {@inheritDoc} */
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10537")
@Test
@Override public void testReconnectClient() throws Exception {
super.testReconnectClient();
}
/** {@inheritDoc} */
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10537")
@Test
@Override public void testReconnectClientAndLeftRouter() throws Exception {
super.testReconnectClientAndLeftRouter();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
index 0cabf65..3256f26 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
@@ -52,7 +52,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testAccountsTxSql_Server_Backups1_CoordinatorFails_Persistence() throws Exception {
persistence = true;
@@ -64,7 +63,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testPutAllGetAll_ClientServer_Backups3_RestartCoordinator_ScanDml() throws Exception {
putAllGetAll(RestartMode.RESTART_CRD , 5, 2, 3, DFLT_PARTITION_COUNT,
@@ -132,7 +130,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testPutAllGetAll_Server_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 0, 1, 1,
@@ -142,7 +139,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testPutAllGetAll_ClientServer_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
putAllGetAll(RestartMode.RESTART_RND_SRV, 3, 1, 1, 1,
@@ -152,7 +148,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_ClientServer_Backups2_Sql() throws Exception {
updateNObjectsTest(7, 3, 2, 2, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
@@ -162,7 +157,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
/**
* @throws Exception If failed.
*/
- @Ignore("https://issues.apache.org/jira/browse/IGNITE-10752")
@Test
public void testUpdate_N_Objects_ClientServer_Backups1_Sql_Persistence() throws Exception {
persistence = true;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
index 15f8ca8..8b8c2b3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -90,7 +89,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel
* @throws IgniteCheckedException if failed.
*/
private static void lockVersion(IgniteEx node) throws IgniteCheckedException {
- node.context().coordinators().requestSnapshotAsync((IgniteInternalTx)null).get();
+ node.context().coordinators().requestReadSnapshotAsync().get();
}
/** {@inheritDoc} */