You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/12 15:02:49 UTC
ignite git commit: ignite-5932
Repository: ignite
Updated Branches:
refs/heads/ignite-5932 b73792aec -> 1b272cbfa
ignite-5932
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b272cbf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b272cbf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b272cbf
Branch: refs/heads/ignite-5932
Commit: 1b272cbfa320721ca7e1deef83441168e86aadce
Parents: b73792a
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 12 14:42:29 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Oct 12 18:02:37 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtCacheAdapter.java | 12 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 13 +-
.../dht/GridPartitionedSingleGetFuture.java | 7 +-
.../GridDhtPartitionsExchangeFuture.java | 29 +-
.../distributed/near/GridNearGetRequest.java | 8 +-
.../near/GridNearSingleGetRequest.java | 41 ++-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../cache/mvcc/CacheCoordinatorsProcessor.java | 25 +-
.../mvcc/CoordinatorActiveQueriesMessage.java | 62 ++++
.../processors/cache/mvcc/MvccQueryTracker.java | 16 +-
.../cache/mvcc/PreviousCoordinatorQueries.java | 12 +-
.../cache/mvcc/CacheMvccTransactionsTest.java | 298 +++++++++++++++++--
.../testsuites/IgniteCacheMvccTestSuite.java | 42 +++
13 files changed, 497 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5dbb3a8..e1c5379 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -848,9 +848,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param taskNameHash Task name hash.
* @param expiry Expiry.
* @param skipVals Skip vals flag.
+ * @param mvccVer Mvcc version.
* @return Future for the operation.
*/
- public GridDhtGetSingleFuture getDhtSingleAsync(
+ GridDhtGetSingleFuture getDhtSingleAsync(
UUID nodeId,
long msgId,
KeyCacheObject key,
@@ -861,7 +862,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiry,
boolean skipVals,
- boolean recovery
+ boolean recovery,
+ MvccCoordinatorVersion mvccVer
) {
GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>(
ctx,
@@ -875,7 +877,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
taskNameHash,
expiry,
skipVals,
- recovery);
+ recovery,
+ mvccVer);
fut.init();
@@ -903,7 +906,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
req.taskNameHash(),
expiryPlc,
req.skipValues(),
- req.recovery());
+ req.recovery(),
+ req.mvccVersion());
fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
@Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9fb4b0a..7462406 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -103,6 +104,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
/** Recovery context flag. */
private final boolean recovery;
+ /** */
+ private final MvccCoordinatorVersion mvccVer;
+
/**
* @param cctx Context.
* @param msgId Message ID.
@@ -115,6 +119,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
* @param taskNameHash Task name hash code.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
+ * @param mvccVer Mvcc version.
*/
public GridDhtGetSingleFuture(
GridCacheContext<K, V> cctx,
@@ -128,7 +133,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
- boolean recovery
+ boolean recovery,
+ @Nullable MvccCoordinatorVersion mvccVer
) {
assert reader != null;
assert key != null;
@@ -145,6 +151,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
this.recovery = recovery;
+ this.mvccVer = mvccVer;
futId = IgniteUuid.randomUuid();
@@ -366,7 +373,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
expiryPlc,
skipVals,
recovery,
- null); // TODO IGNITE-3478
+ mvccVer);
}
else {
final ReaderArguments args = readerArgs;
@@ -392,7 +399,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
expiryPlc,
skipVals,
recovery,
- null); // TODO IGNITE-3478
+ mvccVer);
fut0.listen(createGetFutureListener());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index afef744..c31b8b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -237,7 +237,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
taskName == null ? 0 : taskName.hashCode(),
expiryPlc,
skipVals,
- recovery);
+ recovery,
+ mvccVer);
final Collection<Integer> invalidParts = fut.invalidPartitions();
@@ -282,7 +283,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
cctx.mvcc().addFuture(this, futId);
}
- // TODO IGNITE-3478.
GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
futId.localId(),
key,
@@ -296,7 +296,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
/*add reader*/false,
needVer,
cctx.deploymentEnabled(),
- recovery);
+ recovery,
+ mvccVer);
try {
cctx.io().send(node, req, cctx.ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a8b3dbc..bc38d5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -76,11 +76,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -657,7 +660,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- updateTopologies(crdNode, cctx.coordinators().currentCoordinator());
+ updateTopologies(crd, crdNode, cctx.coordinators().currentCoordinator());
switch (exchange) {
case ALL: {
@@ -760,11 +763,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * @param exchCrd Exchange coordinator node.
* @param crd Coordinator flag.
* @param mvccCrd Mvcc coordinator.
* @throws IgniteCheckedException If failed.
*/
- private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException {
+ private void updateTopologies(ClusterNode exchCrd, boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
@@ -813,7 +817,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
processMvccCoordinatorChange(mvccCrd, (MvccQueryAware)fut, activeQrys);
}
+ for (IgniteInternalTx tx : cctx.tm().activeTransactions()) {
+ if (tx instanceof GridNearTxLocal) {
+ MvccQueryTracker qryTracker = ((GridNearTxLocal)tx).mvccQueryTracker();
+
+ if (qryTracker != null)
+ processMvccCoordinatorChange(mvccCrd, qryTracker, activeQrys);
+ }
+ }
+
exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
+
+ if (exchCrd == null || !mvccCrd.nodeId().equals(exchCrd.id()))
+ cctx.coordinators().sendActiveQueries(mvccCrd.nodeId(), activeQrys);
}
}
@@ -824,8 +840,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
*/
private void processMvccCoordinatorChange(MvccCoordinator mvccCrd,
MvccQueryAware qryAware,
- Map<MvccCounter, Integer> activeQrys
- )
+ Map<MvccCounter, Integer> activeQrys)
{
MvccCoordinatorVersion ver = qryAware.onMvccCoordinatorChange(mvccCrd);
@@ -1300,9 +1315,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
msg.partitionHistoryCounters(partHistReserved0);
}
- Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries();
+ if (exchCtx.newMvccCoordinator() && cctx.coordinators().currentCoordinatorId().equals(node.id())) {
+ Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries();
- msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null);
+ msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null);
+ }
if (stateChangeExchange() && changeGlobalStateE != null)
msg.setError(changeGlobalStateE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index c6f3280..ab927d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Get request. Responsible for obtaining entry from primary node. 'Near' means 'Primary' here, not 'Near Cache'.
@@ -132,6 +133,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
* @param createTtl New TTL to set after entry is created, -1 to leave unchanged.
* @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
* @param addDepInfo Deployment info.
+ * @param mvccVer Mvcc version.
*/
public GridNearGetRequest(
int cacheId,
@@ -149,7 +151,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
boolean skipVals,
boolean addDepInfo,
boolean recovery,
- MvccCoordinatorVersion mvccVer
+ @Nullable MvccCoordinatorVersion mvccVer
) {
assert futId != null;
assert miniId != null;
@@ -194,9 +196,9 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
}
/**
- * @return Counter.
+ * @return Mvcc version.
*/
- public MvccCoordinatorVersion mvccVersion() {
+ @Nullable public MvccCoordinatorVersion mvccVersion() {
return mvccVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 00ff4bb..104a31a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -26,11 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -81,6 +83,9 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
/** TTL for read operation. */
private long accessTtl;
+ /** */
+ private MvccCoordinatorVersion mvccVer;
+
/**
* Empty constructor required for {@link Message}.
*/
@@ -103,6 +108,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
* @param addReader Add reader flag.
* @param needVer {@code True} if entry version is needed.
* @param addDepInfo Deployment info.
+ * @param mvccVer Mvcc version.
*/
public GridNearSingleGetRequest(
int cacheId,
@@ -118,7 +124,8 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
boolean addReader,
boolean needVer,
boolean addDepInfo,
- boolean recovery
+ boolean recovery,
+ MvccCoordinatorVersion mvccVer
) {
assert key != null;
@@ -131,6 +138,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.addDepInfo = addDepInfo;
+ this.mvccVer = mvccVer;
if (readThrough)
flags |= READ_THROUGH_FLAG_MASK;
@@ -149,6 +157,13 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
}
/**
+ * @return Mvcc version.
+ */
+ @Nullable public MvccCoordinatorVersion mvccVersion() {
+ return mvccVer;
+ }
+
+ /**
* @return Key.
*/
public KeyCacheObject key() {
@@ -322,7 +337,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 8:
- subjId = reader.readUuid("subjId");
+ mvccVer = reader.readMessage("mvccVer");
if (!reader.isLastRead())
return false;
@@ -330,7 +345,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 9:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -338,6 +353,14 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 10:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -396,18 +419,24 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
writer.incrementState();
case 8:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeMessage("mvccVer", mvccVer))
return false;
writer.incrementState();
case 9:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 10:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -430,7 +459,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 12;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 51d842c..5df4cca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,7 +237,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
trackTimeout = cctx.time().addTimeoutObject(this);
}
- MvccQueryTracker mvccQueryTracker() {
+ public MvccQueryTracker mvccQueryTracker() {
return mvccTracker;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index a5a9b0a..636634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -471,6 +471,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
return fut;
}
+ /**
+ * @param futId Future ID.
+ * @param updateVer Update version.
+ * @param readVer Optional read version.
+ * @return Message.
+ */
private CoordinatorAckRequestTx createTxAckMessage(long futId,
MvccCoordinatorVersion updateVer,
@Nullable MvccCoordinatorVersion readVer)
@@ -952,6 +958,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param nodeId Node ID.
* @param msg Message.
*/
private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
@@ -999,8 +1006,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
- * @param nodeId
- * @param msg
+ * @param nodeId Node ID.
+ * @param msg Message.
*/
private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
try {
@@ -1019,18 +1026,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
- * @return
+ * @return Coordinator.
*/
public MvccCoordinator currentCoordinator() {
return curCrd;
}
+ /**
+ * @param curCrd Coordinator.
+ */
public void currentCoordinator(MvccCoordinator curCrd) {
this.curCrd = curCrd;
}
/**
- * @return
+ * @return Current coordinator node ID.
*/
public UUID currentCoordinatorId() {
MvccCoordinator curCrd = this.curCrd;
@@ -1062,6 +1072,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param activeQueries
+ */
+ public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) {
+
+ }
+
+ /**
* @param topVer Topology version.
* @param discoCache Discovery data.
* @param activeQueries Current queries.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
new file mode 100644
index 0000000..5032593
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage {
+ /** */
+ @GridDirectMap(keyType = Message.class, valueType = Integer.class)
+ private Map<MvccCounter, Integer> activeQrys;
+
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ return false;
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ return false;
+ }
+
+ @Override public short directType() {
+ return 0;
+ }
+
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ @Override public void onAckReceived() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 0e3eb7b..24d6978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
* TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
* TODO IGNITE-3478: support remap to new coordinator.
*/
-public class MvccQueryTracker {
+public class MvccQueryTracker implements MvccQueryAware {
/** */
private MvccCoordinator mvccCrd;
@@ -64,6 +64,16 @@ public class MvccQueryTracker {
this.lsnr = lsnr;
}
+ @Override
+ public void onMvccVersionReceived(AffinityTopologyVersion topVer) {
+
+ }
+
+ @Override
+ public void onMvccVersionError(IgniteCheckedException e) {
+
+ }
+
/**
* @return Requested mvcc version.
*/
@@ -147,9 +157,9 @@ public class MvccQueryTracker {
}
else {
if (commit)
- return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+ return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
else
- ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+ ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 667865b..b3fc98d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -51,11 +51,11 @@ class PreviousCoordinatorQueries {
private boolean initDone;
/**
- * @param srvNodesQueries Active queries started on server nodes.
+ * @param nodeQueries Active queries map.
* @param discoCache Discovery data.
* @param mgr Discovery manager.
*/
- void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
+ void init(Map<UUID, Map<MvccCounter, Integer>> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
synchronized (this) {
assert !initDone;
assert waitNodes == null;
@@ -63,14 +63,16 @@ class PreviousCoordinatorQueries {
waitNodes = new HashSet<>();
for (ClusterNode node : discoCache.allNodes()) {
- if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id()))
+ if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) &&
+ mgr.alive(node) &&
+ !F.contains(rcvd, node.id()))
waitNodes.add(node.id());
}
initDone = waitNodes.isEmpty();
- if (srvNodesQueries != null) {
- for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet())
+ if (nodeQueries != null) {
+ for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : nodeQueries.entrySet())
addAwaitedActiveQueries(e.getKey(), e.getValue());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 70b910b..87fe137 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
@@ -406,7 +407,91 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testSimplePutGetAll() throws Exception {
+ public void testTxReadSnapshotSimple() throws Exception {
+ Ignite srv0 = startGrids(4);
+
+ client = true;
+
+ startGrid(4);
+
+ for (CacheConfiguration ccfg : cacheConfigurations()) {
+ IgniteCache<Object, Object> cache0 = srv0.createCache(ccfg);
+
+ final Map<Integer, Integer> startVals = new HashMap<>();
+
+ final int KEYS = 10;
+
+ for (int i = 0; i < KEYS; i++)
+ startVals.put(i, 0);
+
+ for (final Ignite node : G.allGrids()) {
+ info("Test node: " + node.name());
+
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache0.putAll(startVals);
+
+ tx.commit();
+ }
+
+ final CountDownLatch readStart = new CountDownLatch(1);
+
+ final CountDownLatch readProceed = new CountDownLatch(1);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ assertEquals(0, cache.get(0));
+
+ readStart.countDown();
+
+ assertTrue(readProceed.await(5, TimeUnit.SECONDS));
+
+ assertEquals(0, cache.get(1));
+
+ assertEquals(0, cache.get(2));
+
+ Map<Object, Object> res = cache.getAll(startVals.keySet());
+
+ assertEquals(startVals.size(), res.size());
+
+ for (Map.Entry<Object, Object> e : res.entrySet())
+ assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue());
+
+ tx.rollback();
+ }
+
+ return null;
+ }
+ });
+
+ assertTrue(readStart.await(5, TimeUnit.SECONDS));
+
+ for (int i = 0; i < KEYS; i++) {
+ try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (i % 2 == 0)
+ cache0.put(i, 1);
+ else
+ cache0.remove(i);
+
+ tx.commit();
+ }
+ }
+
+ readProceed.countDown();
+
+ fut.get();
+ }
+
+ srv0.destroyCache(cache0.getName());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutGetAllSimple() throws Exception {
Ignite node = startGrid(0);
IgniteTransactions txs = node.transactions();
@@ -465,22 +550,22 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testSimplePutRemove() throws Exception {
- simplePutRemove(false);
+ public void testPutRemoveSimple() throws Exception {
+ putRemoveSimple(false);
}
/**
* @throws Exception If failed.
*/
- public void testSimplePutRemove_LargeKeys() throws Exception {
- simplePutRemove(true);
+ public void testPutRemoveSimple_LargeKeys() throws Exception {
+ putRemoveSimple(true);
}
/**
* @throws Exception If failed.
* @param largeKeys {@code True} to use large keys (not fitting in single page).
*/
- private void simplePutRemove(boolean largeKeys) throws Exception {
+ private void putRemoveSimple(boolean largeKeys) throws Exception {
Ignite node = startGrid(0);
IgniteTransactions txs = node.transactions();
@@ -881,9 +966,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (boolean otherPuts : vals) {
for (boolean putOnStart : vals) {
- cleanupWaitsForGet1(otherPuts, putOnStart);
+ for (boolean inTx : vals) {
+ cleanupWaitsForGet1(otherPuts, putOnStart, inTx);
- afterTest();
+ afterTest();
+ }
}
}
}
@@ -891,10 +978,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @param otherPuts {@code True} to update unrelated keys to increment mvcc counter.
* @param putOnStart {@code True} to put data in cache before getAll.
+ * @param inTx {@code True} to read inside transaction.
* @throws Exception If failed.
*/
- private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart) throws Exception {
- info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart + "]");
+ private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart, final boolean inTx) throws Exception {
+ info("cleanupWaitsForGet [otherPuts=" + otherPuts +
+ ", putOnStart=" + putOnStart +
+ ", inTx=" + inTx + "]");
testSpi = true;
@@ -941,7 +1031,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
@Override public Void call() throws Exception {
IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName());
- Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2));
+
+ Map<Integer, Integer> vals;
+
+ if (inTx) {
+ try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ vals = cache.getAll(F.asSet(key1, key2));
+
+ tx.rollback();
+ }
+ }
+ else
+ vals = cache.getAll(F.asSet(key1, key2));
if (putOnStart) {
assertEquals(2, vals.size());
@@ -1713,11 +1814,32 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testPessimisticTxReadsSnapshot_ClientServer() throws Exception {
+ txReadsSnapshot(4, 2, 1, 64, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticTxReadsSnapshot_SingleNode() throws Exception {
+ txReadsSnapshot(1, 0, 0, 64, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testOptimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception {
txReadsSnapshot(1, 0, 0, 1, false);
}
/**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticTxReadsSnapshot_ClientServer() throws Exception {
+ txReadsSnapshot(4, 2, 1, 64, false);
+ }
+
+ /**
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
@@ -1834,7 +1956,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
int remaining = ACCOUNTS;
do {
- // TODO IGNITE-3478: add single get usage.
int readCnt = rnd.nextInt(remaining) + 1;
Set<Integer> readKeys = new TreeSet<>();
@@ -1866,16 +1987,29 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
do {
int readCnt = rnd.nextInt(remaining) + 1;
- Set<Integer> readKeys = new LinkedHashSet<>();
+ if (rnd.nextInt(3) == 0) {
+ for (int i = 0; i < readCnt; i++) {
+ Integer key = rnd.nextInt(ACCOUNTS);
- for (int i = 0; i < readCnt; i++)
- readKeys.add(rnd.nextInt(ACCOUNTS));
+ MvccTestAccount account = cache.get(key);
- Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+ assertNotNull(account);
- assertEquals(readKeys.size(), readRes.size());
+ accounts.put(key, account);
+ }
+ }
+ else {
+ Set<Integer> readKeys = new LinkedHashSet<>();
- accounts.putAll(readRes);
+ for (int i = 0; i < readCnt; i++)
+ readKeys.add(rnd.nextInt(ACCOUNTS));
+
+ Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+
+ assertEquals(readKeys.size(), readRes.size());
+
+ accounts.putAll(readRes);
+ }
remaining = ACCOUNTS - accounts.size();
}
@@ -2119,7 +2253,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testSimpleRebalance() throws Exception {
+ public void testRebalanceSimple() throws Exception {
Ignite srv0 = startGrid(0);
IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
@@ -2182,7 +2316,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testSimpleRebalanceWithRemovedValues() throws Exception {
+ public void testRebalanceWithRemovedValuesSimple() throws Exception {
Ignite node = startGrid(0);
IgniteTransactions txs = node.transactions();
@@ -2462,32 +2596,109 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
- for (int i = 1; i <= 3; i++) {
- readInProgressCoordinatorFailsSimple(false, i);
+ public void testTxInProgressCoordinatorChangeSimple() throws Exception {
+ txInProgressCoordinatorChangeSimple(false);
+ }
- afterTest();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxInProgressCoordinatorChangeSimple_Readonly() throws Exception {
+ txInProgressCoordinatorChangeSimple(true);
+ }
+
+ /**
+ * @param readOnly If {@code true} tests read-only transaction.
+ * @throws Exception If failed.
+ */
+ private void txInProgressCoordinatorChangeSimple(boolean readOnly) throws Exception {
+ CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure());
+
+ Ignite srv0 = startGrids(4);
+
+ client = true;
+
+ startGrid(4);
+
+ client = false;
+
+ nodeAttr = CRD_ATTR;
+
+ int crdIdx = 5;
+
+ startGrid(crdIdx);
+
+ srv0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+ setNodeFilter(new CoordinatorNodeFilter()));
+
+ Set<Integer> keys = F.asSet(1, 2, 3);
+
+ for (int i = 0; i < 5; i++) {
+ Ignite node = ignite(i);
+
+ info("Test with node: " + node.name());
+
+ IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ assertTrue(cache.getAll(keys).isEmpty());
+
+ if (!readOnly)
+ cache.put(0, 0);
+
+ startGrid(crdIdx + 1);
+
+ stopGrid(crdIdx);
+
+ crdIdx++;
+
+ tx.commit();
+ }
+
+ checkActiveQueriesCleanup(ignite(crdIdx));
}
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
+ readInProgressCoordinatorFailsSimple(false);
+ }
/**
* @throws Exception If failed.
*/
public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception {
- for (int i = 1; i <= 3; i++) {
- readInProgressCoordinatorFailsSimple(true, i);
+ readInProgressCoordinatorFailsSimple(true);
+ }
- afterTest();
+ /**
+ * @param fromClient {@code True} if read from client node, otherwise from server node.
+ * @throws Exception If failed.
+ */
+ private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception {
+ for (boolean readInTx : new boolean[]{false, true}) {
+ for (int i = 1; i <= 3; i++) {
+ readInProgressCoordinatorFailsSimple(fromClient, i, readInTx);
+
+ afterTest();
+ }
}
}
/**
* @param fromClient {@code True} if read from client node, otherwise from server node.
* @param crdChangeCnt Number of coordinator changes.
+ * @param readInTx {@code True} to read inside transaction.
* @throws Exception If failed.
*/
- private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt) throws Exception {
- info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + ", crdChangeCnt=" + crdChangeCnt + ']');
+ private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt, final boolean readInTx)
+ throws Exception
+ {
+ info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient +
+ ", crdChangeCnt=" + crdChangeCnt +
+ ", readInTx=" + readInTx + ']');
testSpi = true;
@@ -2540,7 +2751,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
- Map<Integer, Integer> res = cache.getAll(keys);
+ Map<Integer, Integer> res;
+
+ if (readInTx) {
+ try (Transaction tx = getNode.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ res = cache.getAll(keys);
+
+ tx.rollback();
+ }
+ }
+ else
+ res = cache.getAll(keys);
assertEquals(20, res.size());
@@ -2936,7 +3157,19 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
for (int i = 0; i < 10; i++)
vals.put(i, val);
- try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ TransactionConcurrency concurrency;
+ TransactionIsolation isolation;
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ concurrency = PESSIMISTIC;
+ isolation = REPEATABLE_READ;
+ }
+ else {
+ concurrency = OPTIMISTIC;
+ isolation = SERIALIZABLE;
+ }
+
+ try (Transaction tx = putNode.transactions().txStart(concurrency, isolation)) {
for (String cacheName : cacheNames)
putNode.cache(cacheName).putAll(vals);
@@ -2993,7 +3226,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
MvccCoordinator crd = null;
for (Ignite node : G.allGrids()) {
- CacheCoordinatorsProcessor crdProc = ((IgniteKernal) node).context().cache().context().coordinators();
+ CacheCoordinatorsProcessor crdProc = ((IgniteKernal)node).context().cache().context().coordinators();
MvccCoordinator crd0 = crdProc.currentCoordinator();
@@ -3845,6 +4078,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
*
*/
static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> {
+ /** {@inheritDoc} */
@Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) {
for (ClusterNode node : clusterNodes) {
if (node.attribute(CRD_ATTR) != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
new file mode 100644
index 0000000..dc10881
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccConfigurationValidationTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTest;
+
+/**
+ *
+ */
+public class IgniteCacheMvccTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("IgniteCache MVCC Test Suite");
+
+ suite.addTestSuite(CacheMvccTransactionsTest.class);
+ suite.addTestSuite(CacheMvccClusterRestartTest.class);
+ suite.addTestSuite(CacheMvccConfigurationValidationTest.class);
+
+ return suite;
+ }
+}