You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/01 05:55:20 UTC
[10/21] ignite git commit: IGNITE-7764: MVCC: cache API support. This
closes #4725.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 83bb81c..af74996 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache.mvcc;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -64,7 +66,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
@@ -86,9 +87,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
-import org.apache.ignite.transactions.TransactionOptimisticException;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
@@ -98,11 +97,8 @@ import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstract
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
-import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* TODO IGNITE-6739: tests reload
@@ -117,40 +113,104 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
/**
- * @throws Exception If failed.
+ * @throws Exception if failed.
*/
- public void testPessimisticTx1() throws Exception {
- checkTx1(PESSIMISTIC, REPEATABLE_READ);
- }
+ public void testEmptyTx() throws Exception {
+ Ignite node = startGrids(2);
- /**
- * @throws Exception If failed.
- */
- public void testOptimisticSerializableTx1() throws Exception {
- checkTx1(OPTIMISTIC, SERIALIZABLE);
- }
+ IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, DFLT_PARTITION_COUNT));
- /**
- * @throws Exception If failed.
- */
- public void testOptimisticRepeatableReadTx1() throws Exception {
- checkTx1(OPTIMISTIC, REPEATABLE_READ);
+ cache.putAll(Collections.emptyMap());
+
+ IgniteTransactions txs = node.transactions();
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ tx.commit();
+ }
+ finally {
+ stopAllGrids();
+ }
}
/**
- * @throws Exception If failed.
+ * @throws Exception if failed.
*/
- public void testOptimisticReadCommittedTx1() throws Exception {
- checkTx1(OPTIMISTIC, READ_COMMITTED);
+ public void testImplicitTxOps() throws Exception {
+ checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
+ @Override public void apply(IgniteCache<Integer, Integer> cache) {
+ try {
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ assertFalse(cache.containsKey(key));
+
+ cache.put(key, -1);
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(Integer.valueOf(-1), val);
+
+ assertTrue(cache.containsKey(key));
+
+ cache.put(key, key);
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(key, val);
+
+ cache.remove(key);
+
+ val = cache.get(key);
+
+ assertNull(val);
+
+ val = (Integer)checkAndGet(false, cache, key, SCAN, GET);
+
+ assertNull(val);
+
+ assertTrue(cache.putIfAbsent(key, key));
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(key, val);
+
+ val = cache.getAndReplace(key, -1);
+
+ assertEquals(key, val);
+
+ val = (Integer)checkAndGet(true, cache, key, GET, SCAN);
+
+ assertEquals(Integer.valueOf(-1), val);
+
+ val = cache.getAndRemove(key);
+
+ assertEquals(Integer.valueOf(-1), val);
+
+ val = cache.get(key);
+
+ assertNull(val);
+
+ val = (Integer)checkAndGet(false, cache, key, SCAN, GET);
+
+ assertNull(val);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
}
/**
- * @param concurrency Transaction concurrency.
- * @param isolation Transaction isolation.
* @throws Exception If failed.
*/
- private void checkTx1(final TransactionConcurrency concurrency, final TransactionIsolation isolation)
- throws Exception {
+ public void testPessimisticTx1() throws Exception {
checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
@Override public void apply(IgniteCache<Integer, Integer> cache) {
try {
@@ -161,7 +221,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
for (Integer key : keys) {
log.info("Test key: " + key);
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
Integer val = cache.get(key);
assertNull(val);
@@ -191,23 +251,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testPessimisticTx2() throws Exception {
- checkTx2(PESSIMISTIC, REPEATABLE_READ);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testOptimisticSerializableTx2() throws Exception {
- checkTx2(OPTIMISTIC, SERIALIZABLE);
- }
-
- /**
- * @param concurrency Transaction concurrency.
- * @param isolation Transaction isolation.
- * @throws Exception If failed.
- */
- private void checkTx2(final TransactionConcurrency concurrency, final TransactionIsolation isolation)
- throws Exception {
checkTxWithAllCaches(new CI1<IgniteCache<Integer, Integer>>() {
@Override public void apply(IgniteCache<Integer, Integer> cache) {
try {
@@ -218,7 +261,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
for (Integer key : keys) {
log.info("Test key: " + key);
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key, key);
cache.put(key + 1, key + 1);
@@ -394,7 +437,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
keys.add(rnd.nextInt());
if (tx) {
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.getAll(keys);
if (rnd.nextBoolean())
@@ -417,8 +460,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testTxReadIsolationSimple() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
Ignite srv0 = startGrids(4);
client = true;
@@ -453,37 +494,23 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
@Override public Void call() throws Exception {
IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
- try (Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation)) {
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
assertEquals(0, checkAndGet(false, cache, 0, SCAN, GET));
readStart.countDown();
assertTrue(readProceed.await(5, TimeUnit.SECONDS));
- if (isolation == READ_COMMITTED) {
- assertNull(checkAndGet(false, cache, 1, SCAN, GET));
-
- assertEquals(1, checkAndGet(false, cache, 2, SCAN, GET));
-
- Map<Object, Object> res = checkAndGetAll(false, cache, startVals.keySet(), SCAN, GET);
-
- assertEquals(startVals.size() / 2, res.size());
+ assertEquals(0, checkAndGet(true, cache, 1, GET, SCAN));
- for (Map.Entry<Object, Object> e : res.entrySet())
- assertEquals("Invalid value for key: " + e.getKey(), 1, e.getValue());
- }
- else {
- assertEquals(0, checkAndGet(true, cache, 1, GET, SCAN));
-
- assertEquals(0, checkAndGet(true, cache, 2, GET, SCAN));
+ assertEquals(0, checkAndGet(true, cache, 2, GET, SCAN));
- Map<Object, Object> res = checkAndGetAll(true, cache, startVals.keySet(), GET, SCAN);
+ Map<Object, Object> res = checkAndGetAll(true, cache, startVals.keySet(), GET, SCAN);
- assertEquals(startVals.size(), res.size());
+ assertEquals(startVals.size(), res.size());
- for (Map.Entry<Object, Object> e : res.entrySet())
- assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue());
- }
+ for (Map.Entry<Object, Object> e : res.entrySet())
+ assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue());
tx.rollback();
}
@@ -589,8 +616,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
/**
- * @throws Exception If failed.
* @param largeKeys {@code True} to use large keys (not fitting in single page).
+ * @throws Exception If failed.
*/
private void putRemoveSimple(boolean largeKeys) throws Exception {
Ignite node = startGrid(0);
@@ -771,6 +798,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testWaitPreviousTxAck() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
testSpi = true;
startGrid(0);
@@ -1032,11 +1061,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
@Override public Void call() throws Exception {
IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName());
-
Map<Integer, Integer> vals;
if (inTx) {
- try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
vals = checkAndGetAll(false, cache, F.asSet(key1, key2), SCAN, GET);
tx.rollback();
@@ -1085,6 +1113,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testCleanupWaitsForGet2() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
/*
Simulate case when there are two active transactions modifying the same key
(it is possible if key lock is released but ack message is delayed), and at this moment
@@ -1430,8 +1459,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 1, 64, null, SCAN, PUT);
}
-
-
/**
* @throws Exception If failed.
*/
@@ -1576,79 +1603,35 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testPessimisticTxGetAllReadsSnapshot_SingleNode_SinglePartition() throws Exception {
- txReadsSnapshot(1, 0, 0, 1, true, GET);
+ txReadsSnapshot(1, 0, 0, 1, GET);
}
/**
* @throws Exception If failed.
*/
public void testPessimisticTxGetAllReadsSnapshot_ClientServer() throws Exception {
- txReadsSnapshot(4, 2, 1, 64, true, GET);
+ txReadsSnapshot(4, 2, 1, 64, GET);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticTxGetAllReadsSnapshot_SingleNode() throws Exception {
- txReadsSnapshot(1, 0, 0, 64, false, GET);
+ public void testPessimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception {
+ txReadsSnapshot(1, 0, 0, 1, SCAN);
}
/**
* @throws Exception If failed.
*/
- public void testOptimisticTxGetAllReadsSnapshot_SingleNode_SinglePartition() throws Exception {
- txReadsSnapshot(1, 0, 0, 1, false, GET);
+ public void testPessimisticTxScanReadsSnapshot_ClientServer() throws Exception {
+ txReadsSnapshot(4, 2, 1, 64, SCAN);
}
/**
- * @throws Exception If failed.
- */
- public void testOptimisticTxGetAllReadsSnapshot_ClientServer() throws Exception {
- txReadsSnapshot(4, 2, 1, 64, false, GET);
- }
-
-// TODO: IGNITE-7371
-// /**
-// * @throws Exception If failed.
-// */
-// public void testPessimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception {
-// txReadsSnapshot(1, 0, 0, 1, true, SCAN);
-// }
-//
-// /**
-// * @throws Exception If failed.
-// */
-// public void testPessimisticTxScanReadsSnapshot_ClientServer() throws Exception {
-// txReadsSnapshot(4, 2, 1, 64, true, SCAN);
-// }
-//
-// /**
-// * @throws Exception If failed.
-// */
-// public void testOptimisticTxScanReadsSnapshot_SingleNode() throws Exception {
-// txReadsSnapshot(1, 0, 0, 64, false, SCAN);
-// }
-//
-// /**
-// * @throws Exception If failed.
-// */
-// public void testOptimisticTxScanReadsSnapshot_SingleNode_SinglePartition() throws Exception {
-// txReadsSnapshot(1, 0, 0, 1, false, SCAN);
-// }
-//
-// /**
-// * @throws Exception If failed.
-// */
-// public void testOptimisticTxScanReadsSnapshot_ClientServer() throws Exception {
-// txReadsSnapshot(4, 2, 1, 64, false, SCAN);
-// }
-
- /**
* @param srvs Number of server nodes.
* @param clients Number of client nodes.
* @param cacheBackups Number of cache backups.
* @param cacheParts Number of cache partitions.
- * @param pessimistic If {@code true} uses pessimistic tx, otherwise optimistic.
* @param readMode Read mode.
* @throws Exception If failed.
*/
@@ -1657,9 +1640,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
final int clients,
int cacheBackups,
int cacheParts,
- final boolean pessimistic,
ReadMode readMode
) throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
final int ACCOUNTS = 20;
final int ACCOUNT_START_VAL = 1000;
@@ -1668,18 +1652,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
final int readers = 4;
- final TransactionConcurrency concurrency;
- final TransactionIsolation isolation;
-
- if (pessimistic) {
- concurrency = PESSIMISTIC;
- isolation = REPEATABLE_READ;
- }
- else {
- concurrency = OPTIMISTIC;
- isolation = SERIALIZABLE;
- }
-
final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
@Override public void apply(IgniteCache<Object, Object> cache) {
final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
@@ -1689,7 +1661,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
for (int i = 0; i < ACCOUNTS; i++)
accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.putAll(accounts);
tx.commit();
@@ -1718,7 +1690,13 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
while (id1.equals(id2))
id2 = rnd.nextInt(ACCOUNTS);
- TreeSet<Integer> keys = new TreeSet<>();
+ if(id1 > id2) {
+ int tmp = id1;
+ id1 = id2;
+ id2 = tmp;
+ }
+
+ Set<Integer> keys = new HashSet<>();
keys.add(id1);
keys.add(id2);
@@ -1763,88 +1741,36 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
Map<Integer, MvccTestAccount> accounts = new HashMap<>();
- if (pessimistic) {
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
- int remaining = ACCOUNTS;
-
- do {
- int readCnt = rnd.nextInt(remaining) + 1;
-
- Set<Integer> readKeys = new TreeSet<>();
-
- for (int i = 0; i < readCnt; i++)
- readKeys.add(accounts.size() + i);
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ int remaining = ACCOUNTS;
- Map<Integer, MvccTestAccount> readRes =
- checkAndGetAll(false, cache.cache, readKeys, readMode);
+ do {
+ int readCnt = rnd.nextInt(remaining) + 1;
- assertEquals(readCnt, readRes.size());
+ Set<Integer> readKeys = new TreeSet<>();
- accounts.putAll(readRes);
+ for (int i = 0; i < readCnt; i++)
+ readKeys.add(accounts.size() + i);
- remaining = ACCOUNTS - accounts.size();
- }
- while (remaining > 0);
+ Map<Integer, MvccTestAccount> readRes =
+ checkAndGetAll(false, cache.cache, readKeys, readMode);
- validateSum(accounts);
+ assertEquals(readCnt, readRes.size());
- tx.commit();
+ accounts.putAll(readRes);
- cnt++;
+ remaining = ACCOUNTS - accounts.size();
}
- finally {
- cache.readUnlock();
- }
- }
- else {
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
- int remaining = ACCOUNTS;
-
- do {
- int readCnt = rnd.nextInt(remaining) + 1;
-
- if (rnd.nextInt(3) == 0) {
- for (int i = 0; i < readCnt; i++) {
- Integer key = rnd.nextInt(ACCOUNTS);
-
- MvccTestAccount account =
- (MvccTestAccount)checkAndGet(false, cache.cache, key, readMode);
-
- assertNotNull(account);
-
- accounts.put(key, account);
- }
- }
- else {
- Set<Integer> readKeys = new LinkedHashSet<>();
-
- for (int i = 0; i < readCnt; i++)
- readKeys.add(rnd.nextInt(ACCOUNTS));
-
- Map<Integer, MvccTestAccount> readRes =
- checkAndGetAll(false, cache.cache, readKeys, readMode);
+ while (remaining > 0);
- assertEquals(readKeys.size(), readRes.size());
-
- accounts.putAll(readRes);
- }
+ validateSum(accounts);
- remaining = ACCOUNTS - accounts.size();
- }
- while (remaining > 0);
-
- validateSum(accounts);
-
- cnt++;
+ tx.commit();
- tx.commit();
- }
- catch (TransactionOptimisticException ignore) {
- // No-op.
- }
- finally {
- cache.readUnlock();
- }
+ cnt++;
+ }
+ finally {
+ cache.readUnlock();
}
}
@@ -1954,9 +1880,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
int cacheBackups,
int cacheParts,
ReadMode readMode
- )
- throws Exception
- {
+ ) throws Exception {
final int writers = 4;
final int readers = 4;
@@ -2103,50 +2027,26 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
Map<Integer, Integer> map = new TreeMap<>();
+ Set<Integer> keys = new LinkedHashSet();
int cnt = 0;
while (!stop.get()) {
- int keys = rnd.nextInt(32) + 1;
+ int keysCnt = rnd.nextInt(32) + 1;
- while (map.size() < keys)
- map.put(rnd.nextInt(KEYS), cnt);
+ while (keys.size() < keysCnt) {
+ int key = rnd.nextInt(KEYS);
+
+ if(keys.add(key))
+ map.put(key, cnt);
+ }
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
try {
IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
- TransactionConcurrency concurrency;
- TransactionIsolation isolation;
-
- switch (rnd.nextInt(3)) {
- case 0: {
- concurrency = PESSIMISTIC;
- isolation = REPEATABLE_READ;
-
- break;
- }
- case 1: {
- concurrency = OPTIMISTIC;
- isolation = REPEATABLE_READ;
-
- break;
- }
- case 2: {
- concurrency = OPTIMISTIC;
- isolation = SERIALIZABLE;
-
- break;
- }
- default: {
- fail();
-
- return;
- }
- }
-
- try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
if (rnd.nextBoolean()) {
Map<Integer, Integer> res = checkAndGetAll(false, cache.cache, map.keySet(),
rnd.nextBoolean() ? GET : SCAN);
@@ -2158,18 +2058,16 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
tx.commit();
}
- catch (TransactionOptimisticException e) {
- assertEquals(SERIALIZABLE, isolation);
- }
catch (Exception e) {
Assert.assertTrue("Unexpected error: " + e, X.hasCause(e, ClusterTopologyException.class));
}
}
finally {
cache.readUnlock();
- }
- map.clear();
+ keys.clear();
+ map.clear();
+ }
cnt++;
}
@@ -2282,7 +2180,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
Ignite srv0 = startGrid(0);
- IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
+ IgniteCache<Integer, Integer> cache = (IgniteCache)srv0.createCache(
cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT));
Map<Integer, Integer> map;
@@ -2413,32 +2311,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testTxPrepareFailureSimplePessimisticTx() throws Exception {
- txPrepareFailureSimple(PESSIMISTIC, REPEATABLE_READ);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxPrepareFailureSimpleSerializableTx() throws Exception {
- txPrepareFailureSimple(OPTIMISTIC, SERIALIZABLE);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxPrepareFailureSimpleOptimisticTx() throws Exception {
- txPrepareFailureSimple(OPTIMISTIC, REPEATABLE_READ);
- }
-
- /**
- * @param concurrency Transaction concurrency.
- * @param isolation Transaction isolation.
- * @throws Exception If failed.
- */
- private void txPrepareFailureSimple(
- final TransactionConcurrency concurrency,
- final TransactionIsolation isolation
- ) throws Exception {
testSpi = true;
startGrids(3);
@@ -2460,7 +2332,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
@Override public Object call() throws Exception {
try {
- try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key1, 1);
cache.put(key2, 2);
@@ -2477,7 +2349,16 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
}, "tx-thread");
- srv1Spi.waitForBlocked();
+ GridTestUtils.waitForCondition(
+ new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv1Spi.hasBlockedMessages() || fut.isDone() && fut.error() != null;
+ }
+ }, 10_000
+ );
+
+ if (fut.isDone())
+ fut.get(); // Just to fail with future error.
assertFalse(fut.isDone());
@@ -2488,7 +2369,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
assertNull(cache.get(key1));
assertNull(cache.get(key2));
- try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key1, 1);
cache.put(key2, 2);
@@ -2502,64 +2383,9 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testSerializableTxRemap() throws Exception {
- testSpi = true;
-
- startGrids(2);
-
- client = true;
-
- final Ignite client = startGrid(2);
-
- final IgniteCache cache = client.createCache(
- cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT));
-
- final Map<Object, Object> vals = new HashMap<>();
-
- for (int i = 0; i < 100; i++)
- vals.put(i, i);
-
- TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(ignite(2));
-
- clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- return msg instanceof GridNearTxPrepareRequest;
- }
- });
-
- IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
- try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
- cache.putAll(vals);
-
- tx.commit();
- }
-
- return null;
- }
- }, "tx-thread");
-
- clientSpi.waitForBlocked(2);
-
- this.client = false;
-
- startGrid(3);
-
- assertFalse(fut.isDone());
-
- clientSpi.stopBlock();
-
- fut.get();
-
- for (Ignite node : G.allGrids())
- checkValues(vals, node.cache(cache.getName()));
- }
-
-
- /**
- * @throws Exception If failed.
- */
public void testMvccCoordinatorChangeSimple() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9722");
+
Ignite srv0 = startGrid(0);
final List<String> cacheNames = new ArrayList<>();
@@ -2624,19 +2450,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
for (int i = 0; i < 10; i++)
vals.put(i, val);
- TransactionConcurrency concurrency;
- TransactionIsolation isolation;
-
- if (ThreadLocalRandom.current().nextBoolean()) {
- concurrency = PESSIMISTIC;
- isolation = REPEATABLE_READ;
- }
- else {
- concurrency = OPTIMISTIC;
- isolation = SERIALIZABLE;
- }
-
- try (Transaction tx = putNode.transactions().txStart(concurrency, isolation)) {
+ try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (String cacheName : cacheNames)
putNode.cache(cacheName).putAll(vals);
@@ -2906,8 +2720,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws Exception If failed.
*/
public void testUpdate_N_Objects_ClientServer_Backups1_Scan() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
int[] nValues = {3, 5, 10};
for (int n : nValues) {
@@ -2924,7 +2736,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
doImplicitPartsScanTest(1, 0, 0, 1, 10_000);
}
-
/**
* @throws Exception If failed.
*/
@@ -2967,6 +2778,8 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
int cacheBackups,
int cacheParts,
long time) throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
final int KEYS_PER_PART = 20;
final int writers = 4;
@@ -3033,6 +2846,12 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
while (k1.equals(k2))
k2 = partKeys.get(rnd.nextInt(KEYS_PER_PART));
+ if(k1 > k2) {
+ int tmp = k1;
+ k1 = k2;
+ k2 = tmp;
+ }
+
TreeSet<Integer> keys = new TreeSet<>();
keys.add(k1);
@@ -3074,7 +2893,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
ScanQuery<Integer, MvccTestAccount> qry = new ScanQuery<>(part);
- List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll();
+ List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll();
int sum = 0;
@@ -3100,7 +2919,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
try {
ScanQuery<Integer, MvccTestAccount> qry = new ScanQuery<>();
- List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll();
+ List<Cache.Entry<Integer, MvccTestAccount>> res = cache.cache.query(qry).getAll();
int sum = 0;
@@ -3140,8 +2959,6 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @throws IgniteCheckedException If failed.
*/
public void testSize() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-9451");
-
Ignite node = startGrid(0);
IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
@@ -3150,6 +2967,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
final int KEYS = 10;
+ // Initial put.
for (int i = 0; i < KEYS; i++) {
final Integer key = i;
@@ -3162,6 +2980,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
assertEquals(i + 1, cache.size());
}
+ // Update.
for (int i = 0; i < KEYS; i++) {
final Integer key = i;
@@ -3176,6 +2995,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
int size = KEYS;
+ // Remove.
for (int i = 0; i < KEYS; i++) {
if (i % 2 == 0) {
final Integer key = i;
@@ -3207,6 +3027,48 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
}
+ // Check rollback create.
+ for (int i = 0; i < KEYS; i++) {
+ if (i % 2 == 0) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, i);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+ }
+
+ // Check rollback update.
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key, -1);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+
+ // Check rollback remove.
+ for (int i = 0; i < KEYS; i++) {
+ final Integer key = i;
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.remove(key);
+
+ tx.rollback();
+ }
+
+ assertEquals(size, cache.size());
+ }
+
+ // Restore original state.
for (int i = 0; i < KEYS; i++) {
if (i % 2 == 0) {
final Integer key = i;
@@ -3222,6 +3084,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
assertEquals(size, cache.size());
}
}
+
+ // Check state.
+ for (int i = 0; i < KEYS; i++)
+ assertEquals(i, cache.get(i));
}
/**
@@ -3467,7 +3333,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @param readModes Read modes to check.
* @return Value.
*/
- private Object checkAndGet(boolean inTx, IgniteCache cache, Object key, ReadMode ... readModes) {
+ private Object checkAndGet(boolean inTx, IgniteCache cache, Object key, ReadMode... readModes) {
assert readModes != null && readModes.length > 0;
if (inTx)
@@ -3531,13 +3397,12 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
}
}
-
/**
* Checks values obtained with different read modes.
* And returns value in case of it's equality for all read modes.
* Do not use in tests with writers contention.
*
- * // TODO remove inTx flag in IGNITE-7764
+ * // TODO remove inTx flag in IGNITE-6938
* @param inTx Flag whether current read is inside transaction.
* This is because reads can't see writes made in current transaction.
* @param cache Cache.
@@ -3545,7 +3410,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
* @param readModes Read modes to check.
* @return Value.
*/
- private Map checkAndGetAll(boolean inTx, IgniteCache cache, Set keys, ReadMode ... readModes) {
+ private Map checkAndGetAll(boolean inTx, IgniteCache cache, Set keys, ReadMode... readModes) {
assert readModes != null && readModes.length > 0;
if (inTx)
@@ -3571,11 +3436,10 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
return prevVal;
}
-
/**
* Reads value from cache for the given key using given read mode.
*
- * // TODO IGNITE-7764 remove inTx flag
+ * // TODO IGNITE-6938 remove inTx flag
* // TODO IGNITE-6739 add SQL-get support "select _key, _val from cache where _key in ... keySet"
* @param inTx Flag whether current read is inside transaction.
* This is because reads can't see writes made in current transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
index d7948bd..381d9a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorMvccSelfTest.java
@@ -76,6 +76,11 @@ public class DataStreamProcessorMvccSelfTest extends DataStreamProcessorSelfTest
}
/** {@inheritDoc} */
+ @Override public void testFlushTimeout() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9321");
+ }
+
+ /** {@inheritDoc} */
@Override public void testLocal() throws Exception {
// Do not check local caches with MVCC enabled.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java
index ca22b56..1b85d33 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariations.java
@@ -42,7 +42,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
-import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import static org.apache.ignite.internal.util.lang.GridFunc.asArray;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
index 1d382f7..7f22107 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DhtResultSetEnlistFuture.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.query.h2;
import java.sql.ResultSet;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
@@ -31,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class DhtResultSetEnlistFuture extends GridDhtTxAbstractEnlistFuture implements ResultSetEnlistFuture {
+public class DhtResultSetEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements ResultSetEnlistFuture {
/** */
private ResultSet rs;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java
index 1856430..968a856 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/NearResultSetEnlistFuture.java
@@ -32,9 +32,6 @@ import org.jetbrains.annotations.Nullable;
*
*/
public class NearResultSetEnlistFuture extends GridNearTxQueryResultsEnlistFuture implements ResultSetEnlistFuture {
- /** */
- private static final long serialVersionUID = 877907044489718378L;
-
/**
* @param nearNodeId Near node ID.
* @param nearLockVer Near lock version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
index 8c6f407..76f8013 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SqlTransactionsCommandsWithMvccEnabledSelfTest.java
@@ -132,6 +132,8 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
* Test that attempting to perform various SQL operations within non SQL transaction yields an exception.
*/
public void testSqlOperationsWithinNonSqlTransaction() {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
assertSqlOperationWithinNonSqlTransactionThrows("COMMIT");
assertSqlOperationWithinNonSqlTransactionThrows("ROLLBACK");
@@ -230,8 +232,8 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
return null;
}
- }, IgniteCheckedException.class,
- "SQL queries and cache operations may not be used in the same transaction.");
+ }, UnsupportedOperationException.class,
+ "operations are not supported on transactional caches when MVCC is enabled.");
}
finally {
try {
@@ -266,78 +268,6 @@ public class SqlTransactionsCommandsWithMvccEnabledSelfTest extends AbstractSche
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void testCacheOperationsFromSqlTransaction() {
- checkCacheOperationThrows("get", 1);
-
- checkCacheOperationThrows("getAsync", 1);
-
- checkCacheOperationThrows("getEntry", 1);
-
- checkCacheOperationThrows("getEntryAsync", 1);
-
- checkCacheOperationThrows("getAndPut", 1, 1);
-
- checkCacheOperationThrows("getAndPutAsync", 1, 1);
-
- checkCacheOperationThrows("getAndPutIfAbsent", 1, 1);
-
- checkCacheOperationThrows("getAndPutIfAbsentAsync", 1, 1);
-
- checkCacheOperationThrows("getAndReplace", 1, 1);
-
- checkCacheOperationThrows("getAndReplaceAsync", 1, 1);
-
- checkCacheOperationThrows("getAndRemove", 1);
-
- checkCacheOperationThrows("getAndRemoveAsync", 1);
-
- checkCacheOperationThrows("containsKey", 1);
-
- checkCacheOperationThrows("containsKeyAsync", 1);
-
- checkCacheOperationThrows("put", 1, 1);
-
- checkCacheOperationThrows("putAsync", 1, 1);
-
- checkCacheOperationThrows("putIfAbsent", 1, 1);
-
- checkCacheOperationThrows("putIfAbsentAsync", 1, 1);
-
- checkCacheOperationThrows("remove", 1);
-
- checkCacheOperationThrows("removeAsync", 1);
-
- checkCacheOperationThrows("remove", 1, 1);
-
- checkCacheOperationThrows("removeAsync", 1, 1);
-
- checkCacheOperationThrows("replace", 1, 1);
-
- checkCacheOperationThrows("replaceAsync", 1, 1);
-
- checkCacheOperationThrows("replace", 1, 1, 1);
-
- checkCacheOperationThrows("replaceAsync", 1, 1, 1);
-
- checkCacheOperationThrows("getAll", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("containsKeys", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("getEntries", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("putAll", Collections.singletonMap(1, 1));
-
- checkCacheOperationThrows("removeAll", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("getAllAsync", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("containsKeysAsync", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("getEntriesAsync", new HashSet<>(Arrays.asList(1, 2)));
-
- checkCacheOperationThrows("putAllAsync", Collections.singletonMap(1, 1));
-
- checkCacheOperationThrows("removeAllAsync", new HashSet<>(Arrays.asList(1, 2)));
-
checkCacheOperationThrows("invoke", 1, ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
checkCacheOperationThrows("invoke", 1, CACHE_ENTRY_PROC, X.EMPTY_OBJECT_ARRAY);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
index 5c81974..00c748e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
@@ -96,6 +96,8 @@ public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvc
*
*/
public void testSelectForUpdateDistributed() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9724");
+
doTestSelectForUpdateDistributed("Person", false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
index 796c0bb..4ea53e0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
@@ -151,6 +151,8 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT
* @throws Exception If failed.
*/
private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9540");
+
final int VALS = 100;
final int writers = 4;
@@ -377,6 +379,8 @@ public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractT
* @throws Exception If failed.
*/
private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
final int KEYS = 100;
final int writers = 4;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
index b881f02..7076362 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java
@@ -1120,6 +1120,8 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
* @throws Exception If failed.
*/
public void testQueryInsertUpdateMultithread() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-9470");
+
ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
.setIndexedTypes(Integer.class, Integer.class);
@@ -1196,14 +1198,17 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
}
}, 1));
- fut.markInitialized();
-
try {
+ fut.markInitialized();
+
fut.get(TX_TIMEOUT);
}
catch (IgniteCheckedException e) {
onException(ex, e);
}
+ finally {
+ phaser.forceTermination();
+ }
Exception ex0 = ex.get();
@@ -1248,7 +1253,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
tx.timeout(TX_TIMEOUT);
- barrier.await();
+ barrier.await(TX_TIMEOUT, TimeUnit.MILLISECONDS);
IgniteCache<Object, Object> cache0 = node.cache(DEFAULT_CACHE_NAME);
@@ -1262,7 +1267,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
}
}
- barrier.await();
+ barrier.await(TX_TIMEOUT, TimeUnit.MILLISECONDS);
qry = new SqlFieldsQuery("UPDATE Integer SET _val = (_key * 10)");
@@ -1820,7 +1825,7 @@ public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstrac
do {
p = phaser.arriveAndAwaitAdvance();
}
- while (p < phase);
+ while (p < phase && p >= 0 /* check termination */ );
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
new file mode 100644
index 0000000..46aeaa1
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadBulkOpsTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
+
+/**
+ * Test basic mvcc bulk cache operations.
+ */
+public class MvccRepeatableReadBulkOpsTest extends CacheMvccAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** */
+ private int nodesCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+
+ startGridsMultiThreaded(nodesCount() - 1);
+
+ client = true;
+
+ startGrid(nodesCount() - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ grid(0).createCache(cacheConfiguration(cacheMode(), FULL_SYNC, 1, 32).
+ setIndexedTypes(Integer.class, MvccTestAccount.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ grid(0).destroyCache(DEFAULT_CACHE_NAME);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationGetPut() throws Exception {
+ checkOperations(GET, GET, PUT, true);
+ checkOperations(GET, GET, PUT, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationSqlPut() throws Exception {
+ checkOperations(SQL, SQL, PUT, true);
+ checkOperations(SQL, SQL, PUT, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationSqlDml() throws Exception {
+ checkOperations(SQL, SQL, DML, true);
+ checkOperations(SQL, SQL, DML, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationGetDml() throws Exception {
+ checkOperations(GET, GET, DML, true);
+ checkOperations(GET, GET, DML, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationMixedPut() throws Exception {
+ checkOperations(SQL, GET, PUT, false);
+ checkOperations(SQL, GET, PUT, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationMixedPut2() throws Exception {
+ checkOperations(GET, SQL, PUT, false);
+ checkOperations(GET, SQL, PUT, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationMixedDml() throws Exception {
+ checkOperations(SQL, GET, DML, false);
+ checkOperations(SQL, GET, DML, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRepeatableReadIsolationMixedDml2() throws Exception {
+ checkOperations(GET, SQL, DML, false);
+ checkOperations(GET, SQL, DML, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOperationConsistency() throws Exception {
+ checkOperationsConsistency(PUT, false);
+ checkOperationsConsistency(DML, false);
+ checkOperationsConsistency(PUT, true);
+ checkOperationsConsistency(DML, true);
+ }
+
+ /**
+ * Checks SQL and CacheAPI operation isolation consistency.
+ *
+ * @param readModeBefore read mode used before value updated.
+ * @param readModeBefore read mode used after value updated.
+ * @param writeMode write mode used for update.
+ * @throws Exception If failed.
+ */
+ private void checkOperations(ReadMode readModeBefore, ReadMode readModeAfter,
+ WriteMode writeMode, boolean readFromClient) throws Exception {
+ Ignite node1 = grid(readFromClient ? nodesCount() - 1 : 0);
+ Ignite node2 = grid(readFromClient ? 0 : nodesCount() - 1);
+
+ TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME));
+ TestCache<Integer, MvccTestAccount> cache2 = new TestCache<>(node2.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+ final Set<Integer> keysForRemove = new HashSet<>(3);
+
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+
+ final Map<Integer, MvccTestAccount> initialMap = allKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ final Map<Integer, MvccTestAccount> updateMap = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
+ k -> new MvccTestAccount(k, 2))); /* Removed keys are excluded. */
+
+ cache1.cache.putAll(initialMap);
+
+ IgniteTransactions txs1 = node1.transactions();
+ IgniteTransactions txs2 = node2.transactions();
+
+ CountDownLatch updateStart = new CountDownLatch(1);
+ CountDownLatch updateFinish = new CountDownLatch(1);
+
+ // Start concurrent transactions and check isolation.
+ IgniteInternalFuture<Void> updater = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ updateStart.await();
+
+ try (Transaction tx = txs2.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+
+ updateEntries(cache2, updateMap, writeMode);
+ removeEntries(cache2, keysForRemove, writeMode);
+
+ checkContains(cache2, true, updateMap.keySet());
+ checkContains(cache2, false, keysForRemove);
+
+ assertEquals(updateMap, cache2.cache.getAll(allKeys));
+
+ tx.commit();
+ }
+
+ updateFinish.countDown();
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Void> reader = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try (Transaction tx = txs1.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ assertEquals(initialMap, getEntries(cache1, allKeys, readModeBefore));
+
+ checkContains(cache1, true, allKeys);
+
+ updateStart.countDown();
+ updateFinish.await();
+
+ assertEquals(initialMap, getEntries(cache1, allKeys, readModeAfter));
+
+ checkContains(cache1, true,allKeys);
+
+ tx.commit();
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ updater.get(3_000, TimeUnit.MILLISECONDS);
+ reader.get(3_000, TimeUnit.MILLISECONDS);
+ }
+ catch (Throwable e) {
+ throw new AssertionError(e);
+ }
+ finally {
+ updateStart.countDown();
+ updateFinish.countDown();
+ }
+
+ assertEquals(updateMap, cache1.cache.getAll(allKeys));
+ }
+
+ /**
+ * Generate 2 sets of keys. Each set contains primary, backup and non-affinity key for given node cache.
+ *
+ * @param cache Cache.
+ * @param keySet1 Key set.
+ * @param keySet2 Key set.
+ * @return All keys.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected Set<Integer> generateKeySet(IgniteCache<Object, Object> cache, Set<Integer> keySet1,
+ Set<Integer> keySet2) throws IgniteCheckedException {
+ LinkedHashSet<Integer> allKeys = new LinkedHashSet<>();
+
+ allKeys.addAll(primaryKeys(cache, 2));
+ allKeys.addAll(backupKeys(cache, 2, 1));
+ allKeys.addAll(nearKeys(cache, 2, 1));
+
+ List<Integer> keys0 = new ArrayList<>(allKeys);
+
+ for (int i = 0; i < 6; i++) {
+ if (i % 2 == 0)
+ keySet1.add(keys0.get(i));
+ else
+ keySet2.add(keys0.get(i));
+ }
+
+ assert allKeys.size() == 6; // Expects no duplicates.
+
+ return allKeys;
+ }
+
+ /**
+ * Checks SQL and CacheAPI operation see consistent results before and after update.
+ *
+ * @throws Exception If failed.
+ */
+ private void checkOperationsConsistency(WriteMode writeMode, boolean requestFromClient) throws Exception {
+ Ignite node = grid(requestFromClient ? nodesCount() - 1 : 0);
+
+ TestCache<Integer, MvccTestAccount> cache = new TestCache<>(node.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+ final Set<Integer> keysForRemove = new HashSet<>(3);
+
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+
+ int updCnt = 1;
+
+ final Map<Integer, MvccTestAccount> initialVals = allKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ cache.cache.putAll(initialVals);
+
+ IgniteTransactions txs = node.transactions();
+
+ Map<Integer, MvccTestAccount> updatedVals = null;
+
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ Map<Integer, MvccTestAccount> vals1 = getEntries(cache, allKeys, GET);
+ Map<Integer, MvccTestAccount> vals2 = getEntries(cache, allKeys, SQL);
+
+ assertEquals(initialVals, vals1);
+ assertEquals(initialVals, vals2);
+
+ for (ReadMode readMode : new ReadMode[] {GET, SQL}) {
+ int updCnt0 = ++updCnt;
+
+ updatedVals = keysForUpdate.stream().collect(Collectors.toMap(Function.identity(),
+ k -> new MvccTestAccount(k, updCnt0)));
+
+ updateEntries(cache, updatedVals, writeMode);
+ removeEntries(cache, keysForRemove, writeMode);
+
+ assertEquals(String.valueOf(readMode), updatedVals, getEntries(cache, allKeys, readMode));
+ }
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ assertEquals(updatedVals, getEntries(cache, allKeys, GET));
+ assertEquals(updatedVals, getEntries(cache, allKeys, SQL));
+
+ tx.commit();
+ }
+ }
+
+ /**
+ * Gets values with given read mode.
+ *
+ * @param cache Cache.
+ * @param keys Key to be read.
+ * @param readMode Read mode.
+ * @return Key-value result map.
+ */
+ protected Map<Integer, MvccTestAccount> getEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Set<Integer> keys,
+ ReadMode readMode) {
+ switch (readMode) {
+ case GET:
+ return cache.cache.getAll(keys);
+ case SQL:
+ return getAllSql(cache);
+ default:
+ fail();
+ }
+
+ return null;
+ }
+
+ /**
+ * Updates entries with given write mode.
+ *
+ * @param cache Cache.
+ * @param entries Entries to be updated.
+ * @param writeMode Write mode.
+ */
+ protected void updateEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Map<Integer, MvccTestAccount> entries,
+ WriteMode writeMode) {
+ switch (writeMode) {
+ case PUT: {
+ cache.cache.putAll(entries);
+
+ break;
+ }
+ case DML: {
+ for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
+ mergeSql(cache, e.getKey(), e.getValue().val, e.getValue().updateCnt);
+
+ break;
+ }
+ default:
+ fail();
+ }
+ }
+
+ /**
+ * Updates entries with given write mode.
+ *
+ * @param cache Cache.
+ * @param keys Key to be deleted.
+ * @param writeMode Write mode.
+ */
+ protected void removeEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Set<Integer> keys,
+ WriteMode writeMode) {
+ switch (writeMode) {
+ case PUT: {
+ cache.cache.removeAll(keys);
+
+ break;
+ }
+ case DML: {
+ for (Integer key : keys)
+ removeSql(cache, key);
+
+ break;
+ }
+ default:
+ fail();
+ }
+ }
+
+ /**
+ * Check cache contains entries.
+ *
+ * @param cache Cache.
+ * @param expected Expected result.
+ * @param keys Keys to check.
+ */
+ protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected, Set<Integer> keys) {
+ assertEquals(expected, cache.cache.containsKeys(keys));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
new file mode 100644
index 0000000..c782f98
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccRepeatableReadOperationsTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.GET;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+
+/**
+ * Test basic mvcc cache operation operations.
+ */
+public class MvccRepeatableReadOperationsTest extends MvccRepeatableReadBulkOpsTest {
+ /** {@inheritDoc} */
+ @Override protected Map<Integer, MvccTestAccount> getEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Set<Integer> keys,
+ ReadMode readMode) {
+
+ switch (readMode) {
+ case GET: {
+ Map<Integer, MvccTestAccount> res = new HashMap<>();
+
+ for (Integer key : keys) {
+ MvccTestAccount val = cache.cache.get(key);
+
+ if(val != null)
+ res.put(key, val);
+ }
+
+ return res;
+ }
+ case SQL:
+ return getAllSql(cache);
+ default:
+ fail();
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ protected void updateEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Map<Integer, MvccTestAccount> entries,
+ WriteMode writeMode) {
+ switch (writeMode) {
+ case PUT: {
+ for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet())
+ if (e.getValue() == null)
+ cache.cache.remove(e.getKey());
+ else
+ cache.cache.put(e.getKey(), e.getValue());
+
+ break;
+ }
+ case DML: {
+ for (Map.Entry<Integer, MvccTestAccount> e : entries.entrySet()) {
+ if (e.getValue() == null)
+ removeSql(cache, e.getKey());
+ else
+ mergeSql(cache, e.getKey(), e.getValue().val, e.getValue().updateCnt);
+ }
+ break;
+ }
+ default:
+ fail();
+ }
+ }
+
+ /** {@inheritDoc} */
+ protected void removeEntries(
+ TestCache<Integer, MvccTestAccount> cache,
+ Set<Integer> keys,
+ WriteMode writeMode) {
+ switch (writeMode) {
+ case PUT: {
+ for (Integer key : keys)
+ cache.cache.remove(key);
+
+ break;
+ }
+ case DML: {
+ for (Integer key : keys)
+ removeSql(cache, key);
+
+ break;
+ }
+ default:
+ fail();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkContains(TestCache<Integer, MvccTestAccount> cache, boolean expected,
+ Set<Integer> keys) {
+ for (Integer key : keys)
+ assertEquals(expected, cache.cache.containsKey(key));
+ }
+
+ /**
+ * Check getAndPut/getAndRemove operations consistency.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testGetAndUpdateOperations() throws IgniteCheckedException {
+ Ignite node1 = grid(0);
+
+ TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+ final Set<Integer> keysForRemove = new HashSet<>(3);
+
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForUpdate, keysForRemove);
+
+ final Map<Integer, MvccTestAccount> initialMap = keysForRemove.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ Map<Integer, MvccTestAccount> updateMap = keysForUpdate.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 3)));
+
+ cache1.cache.putAll(initialMap);
+
+ IgniteTransactions txs = node1.transactions();
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ for (Integer key : keysForUpdate) {
+ MvccTestAccount newVal1 = new MvccTestAccount(key, 1);
+
+ assertNull(cache1.cache.getAndPut(key, newVal1)); // Check create.
+
+ MvccTestAccount newVal2 = new MvccTestAccount(key, 2);
+
+ assertEquals(newVal1, cache1.cache.getAndPut(key, newVal2)); // Check update.
+ }
+
+ for (Integer key : keysForRemove) {
+ assertEquals(initialMap.get(key), cache1.cache.getAndRemove(key)); // Check remove existed.
+
+ assertNull(cache1.cache.getAndRemove(key)); // Check remove non-existed.
+ }
+
+ for (Integer key : allKeys) {
+ MvccTestAccount oldVal = new MvccTestAccount(key, 2);
+ MvccTestAccount newVal = new MvccTestAccount(key, 3);
+
+ if (keysForRemove.contains(key))
+ assertNull(cache1.cache.getAndReplace(key, newVal)); // Omit update 'null'.
+ else
+ assertEquals(oldVal, cache1.cache.getAndReplace(key, newVal)); // Check updated.
+ }
+
+ assertEquals(updateMap, getEntries(cache1, allKeys, SQL));
+ assertEquals(updateMap, getEntries(cache1, allKeys, GET));
+
+ tx.commit();
+ }
+
+ assertEquals(updateMap, getEntries(cache1, allKeys, SQL));
+ assertEquals(updateMap, getEntries(cache1, allKeys, GET));
+ }
+
+ /**
+ * Check getAndPut/getAndRemove operations consistency.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testPutIfAbsentConsistency() throws IgniteCheckedException {
+ Ignite node1 = grid(0);
+
+ TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> keysForCreate = new HashSet<>(3);
+ final Set<Integer> keysForUpdate = new HashSet<>(3);
+
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), keysForCreate, keysForUpdate);
+
+ final Map<Integer, MvccTestAccount> initialMap = keysForUpdate.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ Map<Integer, MvccTestAccount> updatedMap = allKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ cache1.cache.putAll(initialMap);
+
+ IgniteTransactions txs = node1.transactions();
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ for (Integer key : keysForUpdate)
+ assertFalse(cache1.cache.putIfAbsent(key, new MvccTestAccount(key, 2))); // Check update.
+
+ for (Integer key : keysForCreate)
+ assertTrue(cache1.cache.putIfAbsent(key, new MvccTestAccount(key, 1))); // Check create.
+
+ assertEquals(updatedMap, getEntries(cache1, allKeys, SQL));
+
+ tx.commit();
+ }
+
+ assertEquals(updatedMap, getEntries(cache1, allKeys, SQL));
+ assertEquals(updatedMap, getEntries(cache1, allKeys, GET));
+ }
+
+ /**
+ * Check getAndPut/getAndRemove operations consistency.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testReplaceConsistency() throws IgniteCheckedException {
+ Ignite node1 = grid(0);
+
+ TestCache<Integer, MvccTestAccount> cache1 = new TestCache<>(node1.cache(DEFAULT_CACHE_NAME));
+
+ final Set<Integer> existedKeys = new HashSet<>(3);
+ final Set<Integer> nonExistedKeys = new HashSet<>(3);
+
+ final Set<Integer> allKeys = generateKeySet(grid(0).cache(DEFAULT_CACHE_NAME), existedKeys, nonExistedKeys);
+
+ final Map<Integer, MvccTestAccount> initialMap = existedKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 1)));
+
+ Map<Integer, MvccTestAccount> updateMap = existedKeys.stream().collect(
+ Collectors.toMap(k -> k, k -> new MvccTestAccount(k, 3)));
+
+ cache1.cache.putAll(initialMap);
+
+ IgniteTransactions txs = node1.transactions();
+ try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) {
+ for (Integer key : allKeys) {
+ MvccTestAccount newVal = new MvccTestAccount(key, 2);
+
+ if(existedKeys.contains(key)) {
+ assertTrue(cache1.cache.replace(key, new MvccTestAccount(key, 1), newVal));
+
+ assertEquals(newVal, cache1.cache.getAndReplace(key, new MvccTestAccount(key, 3)));
+ }
+ else {
+ assertFalse(cache1.cache.replace(key, new MvccTestAccount(key, 1), newVal));
+
+ assertNull(cache1.cache.getAndReplace(key, new MvccTestAccount(key, 3)));
+ }
+ }
+
+ assertEquals(updateMap, getEntries(cache1, allKeys, SQL));
+ assertEquals(updateMap, getEntries(cache1, allKeys, GET));
+
+ tx.commit();
+ }
+
+ assertEquals(updateMap, getEntries(cache1, allKeys, SQL));
+ assertEquals(updateMap, getEntries(cache1, allKeys, GET));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index 726c4e9..21ab2e6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfiguratio
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest;
import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest;
import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
/**
@@ -64,6 +66,10 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class);
+ // SQL vs CacheAPI consistency.
+ suite.addTestSuite(MvccRepeatableReadOperationsTest.class);
+ suite.addTestSuite(MvccRepeatableReadBulkOpsTest.class);
+
// JDBC tests.
suite.addTestSuite(CacheMvccSizeWithConcurrentJdbcTransactionTest.class);
suite.addTestSuite(CacheMvccScanQueryWithConcurrentJdbcTransactionTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
index 1f600dd..5b4106a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
@@ -81,9 +81,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
"TimeServerPortRange",
"IncludeProperties",
"isAutoActivationEnabled", // IGNITE-7301
- "isMvccEnabled", //TODO: IGNITE-9390: Remove when Mvcc support will be added.
- "MvccVacuumTimeInterval", //TODO: IGNITE-9390: Remove when Mvcc support will be added.
- "MvccVacuumThreadCnt" //TODO: IGNITE-9390: Remove when Mvcc support will be added.
+ "MvccVacuumFrequency", //TODO: IGNITE-9390: Remove when Mvcc support will be added.
+ "MvccVacuumThreadCount" //TODO: IGNITE-9390: Remove when Mvcc support will be added.
};
/// <summary>