You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/13 09:05:36 UTC
[03/21] ignite git commit: ignite-1607 WIP
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index dfe82d4..a620ee5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
@@ -30,6 +33,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
@@ -42,17 +47,22 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.jsr166.ConcurrentHashMap8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -67,6 +77,12 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
+ private static final boolean FAST = true;
+
+ /** */
+ private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
+
+ /** */
private static final int SRVS = 4;
/** */
@@ -79,42 +95,793 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, CLIENTS);
+
+ client = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnly1() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.rollback();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.commit();
+ }
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnly2() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override
+ public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.get(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override
+ public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.get(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnlyGetAll() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(i);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+
+ tx.commit();
+ }
+
+ for (Integer key : keys)
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+
+ tx.rollback();
+ }
+
+ for (Integer key : keys)
+ checkValue(key, null, cache.getName());
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRead1() throws Exception {
+ txConflictRead(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRead2() throws Exception {
+ txConflictRead(false);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @throws Exception If failed.
+ */
+ private void txConflictRead(boolean noVal) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = cache.get(key);
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadWrite1() throws Exception {
+ txConflictReadWrite(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadWrite2() throws Exception {
+ txConflictReadWrite(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadRemove1() throws Exception {
+ txConflictReadWrite(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadRemove2() throws Exception {
+ txConflictReadWrite(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(1, (Object) val);
+
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndPut1() throws Exception {
+ txConflictGetAndPut(true, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndPut2() throws Exception {
+ txConflictGetAndPut(false, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndRemove1() throws Exception {
+ txConflictGetAndPut(true, true);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndRemove2() throws Exception {
+ txConflictGetAndPut(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke1() throws Exception {
+ txConflictInvoke(true, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke2() throws Exception {
+ txConflictInvoke(false, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke3() throws Exception {
+ txConflictInvoke(true, true);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke4() throws Exception {
+ txConflictInvoke(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} invoke does remove value, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictPutIfAbsent() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertTrue(put);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertFalse(put);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertTrue(put);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertFalse(put);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReplace() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertTrue(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertTrue(replace);
- cfg.setClientMode(client);
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
- return cfg;
- }
+ return null;
+ }
+ }
+ );
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ tx.commit();
+ }
- startGridsMultiThreaded(SRVS);
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
- client = true;
+ checkValue(key, null, cache.getName());
- startGridsMultiThreaded(SRVS, CLIENTS);
+ cache.put(key, 1);
- client = false;
- }
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
+ assertTrue(replace);
- stopAllGrids();
- }
+ tx.commit();
+ }
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 5 * 60_000;
+ checkValue(key, 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
}
/**
* @throws Exception If failed.
*/
- public void testTxCommitReadOnly() throws Exception {
+ public void testTxConflictGetAndReplace() throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
@@ -125,99 +892,86 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
- List<Integer> keys = new ArrayList<>();
+ List<Integer> keys = testKeys(cache);
- keys.add(nearKey(cache));
- keys.add(primaryKey(cache));
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
- if (ccfg.getBackups() != 0)
- keys.add(backupKey(cache));
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
- for (Integer key : keys) {
- log.info("Test key: " + key);
+ assertNull(old);
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ updateKey(cache, key, 1);
- assertNull(val);
+ tx.commit();
+ }
- tx.commit();
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
}
- checkValue(key, null, ccfg.getName());
+ checkValue(key, 1, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ Object old = cache.getAndReplace(key, 2);
- assertNull(val);
+ assertEquals(1, old);
- tx.rollback();
+ tx.commit();
}
- checkValue(key, null, ccfg.getName());
- }
- }
- finally {
- ignite0.destroyCache(ccfg.getName());
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxRollbackRead1() throws Exception {
- txRollbackRead(true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxRollbackRead2() throws Exception {
- txRollbackRead(false);
- }
-
- /**
- * @param noVal If {@code true} there is no cache value when read in tx.
- * @throws Exception If failed.
- */
- private void txRollbackRead(boolean noVal) throws Exception {
- Ignite ignite0 = ignite(0);
-
- final IgniteTransactions txs = ignite0.transactions();
+ checkValue(key, 2, cache.getName());
- List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+ cache.remove(key);
- ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
- for (CacheConfiguration<Integer, Integer> ccfg : ccfgs) {
- logCacheInfo(ccfg);
+ assertNull(old);
- try {
- IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+ tx.commit();
+ }
- List<Integer> keys = new ArrayList<>();
+ checkValue(key, null, cache.getName());
- keys.add(nearKey(cache));
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
- for (Integer key : keys) {
- log.info("Test key: " + key);
+ assertNull(old);
- Integer expVal = null;
+ updateKey(cache, key, 3);
- if (!noVal) {
- expVal = -1;
+ tx.commit();
+ }
- cache.put(key, expVal);
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
}
+ checkValue(key, 3, cache.getName());
+
try {
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ Object old = cache.getAndReplace(key, 2);
- assertEquals(expVal, val);
+ assertEquals(3, old);
- updateKey(cache, key, 1);
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
tx.commit();
}
@@ -228,21 +982,23 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
log.info("Expected exception: " + e);
}
- assertEquals(1, (Object)cache.get(key));
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Object val = cache.get(key);
+ Object old = cache.getAndReplace(key, 2);
- assertEquals(1, val);
+ assertEquals(1, old);
tx.commit();
}
- assertEquals(1, (Object)cache.get(key));
+ checkValue(key, 2, cache.getName());
}
}
finally {
- ignite0.destroyCache(ccfg.getName());
+ destroyCache(ignite0, ccfg.getName());
}
}
}
@@ -250,46 +1006,113 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void _testTxRollbackReadWrite() throws Exception {
+ public void testTxNoConflictPut1() throws Exception {
+ txNoConflictUpdate(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictPut2() throws Exception {
+ txNoConflictUpdate(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictRemove1() throws Exception {
+ txNoConflictUpdate(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictRemove2() throws Exception {
+ txNoConflictUpdate(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param noVal If {@code true} there is no cache value when do update in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ */
+ private void txNoConflictUpdate(boolean noVal, boolean rmv) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
- final IgniteCache<Integer, Integer> cache =
- ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
- final Integer key = nearKey(cache);
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
- try {
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- Integer val = cache.get(key);
+ List<Integer> keys = testKeys(cache);
- assertNull(val);
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
- updateKey(cache, key, 1);
+ if (!noVal)
+ cache.put(key, -1);
- cache.put(key, 2);
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
- log.info("Commit");
+ updateKey(cache, key, 1);
- tx.commit();
- }
+ tx.commit();
+ }
- fail();
- }
- catch (TransactionOptimisticException e) {
- log.info("Expected exception: " + e);
- }
+ checkValue(key, rmv ? null : 2, cache.getName());
- assertEquals(1, (Object)cache.get(key));
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 3);
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- cache.put(key, 2);
+ tx.commit();
+ }
- tx.commit();
- }
+ checkValue(key, 3, cache.getName());
+ }
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ if (rmv)
+ cache.removeAll(map.keySet());
+ else
+ cache.putAll(map);
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, -1);
+
+ cache.putAll(map);
- assertEquals(2, (Object) cache.get(key));
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ for (int i = 0; i < 100; i++)
+ checkValue(i, rmv ? null : i, cache.getName());
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
}
/**
@@ -306,43 +1129,47 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
try {
IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
- final Integer key = nearKey(cache);
+ List<Integer> keys = testKeys(cache);
- CountDownLatch latch = new CountDownLatch(1);
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
- IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
+ CountDownLatch latch = new CountDownLatch(1);
- try {
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- cache.put(key, 2);
+ IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
- log.info("Commit");
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 2);
- tx.commit();
+ log.info("Commit");
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
}
- fail();
- }
- catch (TransactionOptimisticException e) {
- log.info("Expected exception: " + e);
- }
+ latch.countDown();
- latch.countDown();
+ fut.get();
- fut.get();
+ checkValue(key, 1, cache.getName());
- assertEquals(1, (Object)cache.get(key));
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 2);
- try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
- cache.put(key, 2);
+ tx.commit();
+ }
- tx.commit();
+ checkValue(key, 2, cache.getName());
}
-
- assertEquals(2, (Object)cache.get(key));
}
finally {
- ignite0.destroyCache(ccfg.getName());
+ destroyCache(ignite0, ccfg.getName());
}
}
}
@@ -365,7 +1192,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
* @param locKey If {@code true} gets lock for local key.
* @throws Exception If failed.
*/
- public void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
+ private void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
Ignite ignite0 = ignite(0);
final IgniteTransactions txs = ignite0.transactions();
@@ -388,8 +1215,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
cache.put(key1, 2);
cache.put(key2, 2);
- log.info("Commit2");
-
tx.commit();
}
@@ -403,23 +1228,21 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
fut.get();
- assertEquals(1, (Object) cache.get(key1));
- assertNull(cache.get(key2));
+ checkValue(key1, 1, cache.getName());
+ checkValue(key2, null, cache.getName());
try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(key1, 2);
cache.put(key2, 2);
- log.info("Commit3");
-
tx.commit();
}
- assertEquals(2, (Object) cache.get(key2));
- assertEquals(2, (Object) cache.get(key2));
+ checkValue(key1, 2, cache.getName());
+ checkValue(key2, 2, cache.getName());
}
finally {
- ignite0.destroyCache(ccfg.getName());
+ destroyCache(ignite0, ccfg.getName());
}
}
}
@@ -511,7 +1334,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
});
}
- for (int i = 0; i < 10; i++) {
+ int ITERS = FAST ? 1 : 10;
+
+ for (int i = 0; i < ITERS; i++) {
log.info("Iteration: " + i);
final long stopTime = U.currentTimeMillis() + 10_000;
@@ -594,7 +1419,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- ignite(1).destroyCache(cacheName);
+ destroyCache(ignite(1), cacheName);
}
}
@@ -608,11 +1433,13 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false));
+ ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, false, false));
// Store, no near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false));
+ ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0, true, false));
// No store, near.
ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true));
@@ -640,32 +1467,74 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @param cache Cache.
- * @param key Key.
- * @param val Value.
+ * @return Test keys.
* @throws Exception If failed.
*/
- private void updateKey(
- final IgniteCache<Integer, Integer> cache,
- final Integer key,
- final Integer val) throws Exception {
+ private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws Exception {
+ CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+ List<Integer> keys = new ArrayList<>();
+
+ if (ccfg.getCacheMode() == PARTITIONED)
+ keys.add(nearKey(cache));
+
+ keys.add(primaryKey(cache));
+
+ if (ccfg.getBackups() != 0)
+ keys.add(backupKey(cache));
+
+ return keys;
+ }
+
+ /**
+ * @param cache Cache.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolcation.
+ * @param c Closure to run in transaction.
+ * @throws Exception If failed.
+ */
+ private void txAsync(final IgniteCache<Integer, Integer> cache,
+ final TransactionConcurrency concurrency,
+ final TransactionIsolation isolation,
+ final IgniteClosure<IgniteCache<Integer, Integer>, Void> c) throws Exception {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
+ @Override
+ public Void call() throws Exception {
IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
- try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
- cache.put(key, val);
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ c.apply(cache);
tx.commit();
}
return null;
}
- }, "update-thread");
+ }, "async-thread");
fut.get();
}
/**
+ * @param cache Cache.
+ * @param key Key.
+ * @param val Value.
+ * @throws Exception If failed.
+ */
+ private void updateKey(
+ final IgniteCache<Integer, Integer> cache,
+ final Integer key,
+ final Integer val) throws Exception {
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.put(key, val);
+
+ return null;
+ }
+ });
+ }
+
+ /**
* @param key Key.
* @param expVal Expected value.
* @param cacheName Cache name.
@@ -719,6 +1588,16 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ */
+ private void destroyCache(Ignite ignite, String cacheName) {
+ storeMap.clear();
+
+ ignite.destroyCache(cacheName);
+ }
+
+ /**
* @param cacheMode Cache mode.
* @param syncMode Write synchronization mode.
* @param backups Number of backups.
@@ -736,9 +1615,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
- ccfg.setBackups(backups);
ccfg.setWriteSynchronizationMode(syncMode);
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
if (storeEnabled) {
ccfg.setCacheStoreFactory(new TestStoreFactory());
ccfg.setWriteThrough(true);
@@ -759,17 +1640,44 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
@Override public CacheStore<Integer, Integer> create() {
return new CacheStoreAdapter<Integer, Integer>() {
@Override public Integer load(Integer key) throws CacheLoaderException {
- return null;
+ return storeMap.get(key);
}
@Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
- // No-op.
+ storeMap.put(entry.getKey(), entry.getValue());
}
@Override public void delete(Object key) {
- // No-op.
+ storeMap.remove(key);
}
};
}
}
+
+ /**
+ * Sets given value, returns old value.
+ */
+ public static final class SetValueProcessor implements EntryProcessor<Integer, Integer, Integer> {
+ /** */
+ private Integer newVal;
+
+ /**
+ * @param newVal New value to set.
+ */
+ SetValueProcessor(Integer newVal) {
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) {
+ Integer val = entry.getValue();
+
+ if (newVal == null)
+ entry.remove();
+ else
+ entry.setValue(newVal);
+
+ return val;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6849ebe1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index e46c139..6b2a6c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -442,7 +442,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(boolean readSwap,
+ @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ IgniteInternalTx tx,
+ boolean readSwap,
boolean unmarshal,
boolean updateMetrics,
boolean evt,