You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/10/28 10:30:42 UTC
[39/46] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
new file mode 100644
index 0000000..70ddfa0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -0,0 +1,4295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final boolean FAST = false;
+
+ /** */
+ private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private static final int CLIENTS = 3;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, CLIENTS);
+
+ client = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxStreamerLoad() throws Exception {
+ txStreamerLoad(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxStreamerLoadAllowOverwrite() throws Exception {
+ txStreamerLoad(true);
+ }
+
+ /**
+ * @param allowOverwrite Streamer flag.
+ * @throws Exception If failed.
+ */
+ private void txStreamerLoad(boolean allowOverwrite) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ if (ccfg.getCacheStoreFactory() == null)
+ continue;
+
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys)
+ txStreamerLoad(ignite0, key, cache.getName(), allowOverwrite);
+
+ txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite);
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @param key Key.
+ * @param cacheName Cache name.
+ * @param allowOverwrite Streamer flag.
+ * @throws Exception If failed.
+ */
+ private void txStreamerLoad(Ignite ignite,
+ Integer key,
+ String cacheName,
+ boolean allowOverwrite) throws Exception {
+ IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+ log.info("Test key: " + key);
+
+ Integer loadVal = -1;
+
+ IgniteTransactions txs = ignite.transactions();
+
+ try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cache.getName())) {
+ streamer.allowOverwrite(allowOverwrite);
+
+ streamer.addData(key, loadVal);
+ }
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(loadVal, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, loadVal, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(loadVal, val);
+
+ cache.put(key, 0);
+
+ tx.commit();
+ }
+
+ checkValue(key, 0, cache.getName());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxLoadFromStore() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ if (ccfg.getCacheStoreFactory() == null)
+ continue;
+
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer storeVal = -1;
+
+ storeMap.put(key, storeVal);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(storeVal, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, storeVal, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnly1() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.rollback();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ tx.commit();
+ }
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnly2() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.get(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertNull(val);
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.get(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommit() throws Exception {
+ Ignite ignite0 = ignite(0);
+ Ignite ignite1 = ignite(1);
+
+ final IgniteTransactions txs0 = ignite0.transactions();
+ final IgniteTransactions txs1 = ignite1.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
+ IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName());
+
+ List<Integer> keys = testKeys(cache0);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ for (int i = 0; i < 100; i++) {
+ try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache0.get(key);
+
+ assertEquals(expVal, val);
+
+ cache0.put(key, i);
+
+ tx.commit();
+
+ expVal = i;
+ }
+
+ try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache1.get(key);
+
+ assertEquals(expVal, val);
+
+ cache1.put(key, val);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache0.get(key);
+
+ assertEquals(expVal, val);
+
+ cache0.put(key, val);
+
+ tx.commit();
+ }
+ }
+
+ checkValue(key, expVal, cache0.getName());
+
+ cache0.remove(key);
+
+ try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache0.get(key);
+
+ assertNull(val);
+
+ cache0.put(key, expVal + 1);
+
+ tx.commit();
+ }
+
+ checkValue(key, expVal + 1, cache0.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxRollback() throws Exception {
+ Ignite ignite0 = ignite(0);
+ Ignite ignite1 = ignite(1);
+
+ final IgniteTransactions txs0 = ignite0.transactions();
+ final IgniteTransactions txs1 = ignite1.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
+ IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName());
+
+ List<Integer> keys = testKeys(cache0);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ for (int i = 0; i < 100; i++) {
+ try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache0.get(key);
+
+ assertEquals(expVal, val);
+
+ cache0.put(key, i);
+
+ tx.rollback();
+ }
+
+ try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache0.get(key);
+
+ assertEquals(expVal, val);
+
+ cache0.put(key, i);
+
+ tx.commit();
+
+ expVal = i;
+ }
+
+ try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache1.get(key);
+
+ assertEquals(expVal, val);
+
+ cache1.put(key, val);
+
+ tx.commit();
+ }
+ }
+
+ checkValue(key, expVal, cache0.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadOnlyGetAll() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(i);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+
+ tx.commit();
+ }
+
+ for (Integer key : keys)
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ assertTrue(map.isEmpty());
+
+ tx.rollback();
+ }
+
+ for (Integer key : keys)
+ checkValue(key, null, cache.getName());
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCommitReadWriteTwoNodes() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ Integer key0 = primaryKey(ignite(0).cache(null));
+ Integer key1 = primaryKey(ignite(1).cache(null));
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key0, key0);
+
+ cache.get(key1);
+
+ tx.commit();
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRead1() throws Exception {
+ txConflictRead(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRead2() throws Exception {
+ txConflictRead(false);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @throws Exception If failed.
+ */
+ private void txConflictRead(boolean noVal) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = cache.get(key);
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadWrite1() throws Exception {
+ txConflictReadWrite(true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadWrite2() throws Exception {
+ txConflictReadWrite(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadRemove1() throws Exception {
+ txConflictReadWrite(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadRemove2() throws Exception {
+ txConflictReadWrite(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictReadWrite(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.get(key);
+
+ assertEquals(1, (Object) val);
+
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndPut1() throws Exception {
+ txConflictGetAndPut(true, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndPut2() throws Exception {
+ txConflictGetAndPut(false, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndRemove1() throws Exception {
+ txConflictGetAndPut(true, true);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictGetAndRemove2() throws Exception {
+ txConflictGetAndPut(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictGetAndPut(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = rmv ? cache.getAndRemove(key) : cache.getAndPut(key, 2);
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke1() throws Exception {
+ txConflictInvoke(true, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke2() throws Exception {
+ txConflictInvoke(false, false);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke3() throws Exception {
+ txConflictInvoke(true, true);
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvoke4() throws Exception {
+ txConflictInvoke(false, true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when read in tx.
+ * @param rmv If {@code true} invoke does remove value, otherwise put.
+ * @throws Exception If failed.
+ */
+ private void txConflictInvoke(boolean noVal, boolean rmv) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ Integer expVal = null;
+
+ if (!noVal) {
+ expVal = -1;
+
+ cache.put(key, expVal);
+ }
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Integer val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+ assertEquals(expVal, val);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = cache.invoke(key, new SetValueProcessor(rmv ? null : 2));
+
+ assertEquals(1, val);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed
+ */
+ public void testTxConflictInvokeAll() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
+
+ final Integer key1 = primaryKey(ignite(0).cache(cache0.getName()));
+ final Integer key2 = primaryKey(ignite(1).cache(cache0.getName()));
+
+ Map<Integer, Integer> vals = new HashMap<>();
+
+ int newVal = 0;
+
+ for (Ignite ignite : G.allGrids()) {
+ log.info("Test node: " + ignite.name());
+
+ IgniteTransactions txs = ignite.transactions();
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(cache0.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, EntryProcessorResult<Integer>> res =
+ cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal));
+
+ if (!vals.isEmpty()) {
+ EntryProcessorResult<Integer> res1 = res.get(key1);
+
+ assertNotNull(res1);
+ assertEquals(vals.get(key1), res1.get());
+
+ EntryProcessorResult<Integer> res2 = res.get(key2);
+
+ assertNotNull(res2);
+ assertEquals(vals.get(key2), res2.get());
+ }
+ else
+ assertTrue(res.isEmpty());
+
+ tx.commit();
+ }
+
+ checkValue(key1, newVal, cache.getName());
+ checkValue(key2, newVal, cache.getName());
+
+ vals.put(key1, newVal);
+ vals.put(key2, newVal);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, EntryProcessorResult<Integer>> res =
+ cache.invokeAll(F.asSet(key1, key2), new SetValueProcessor(newVal + 1));
+
+ EntryProcessorResult<Integer> res1 = res.get(key1);
+
+ assertNotNull(res1);
+ assertEquals(vals.get(key1), res1.get());
+
+ EntryProcessorResult<Integer> res2 = res.get(key2);
+
+ assertNotNull(res2);
+ assertEquals(vals.get(key2), res2.get());
+
+ updateKey(cache0, key1, -1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key1, -1, cache.getName());
+ checkValue(key2, newVal, cache.getName());
+
+ vals.put(key1, -1);
+ vals.put(key2, newVal);
+
+ newVal++;
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictPutIfAbsent() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertTrue(put);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertFalse(put);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertTrue(put);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean put = cache.putIfAbsent(key, 2);
+
+ assertFalse(put);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictGetAndPutIfAbsent() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndPutIfAbsent(key, 2);
+
+ assertNull(old);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndPutIfAbsent(key, 2);
+
+ assertEquals(1, old);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndPutIfAbsent(key, 2);
+
+ assertNull(old);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndPutIfAbsent(key, 4);
+
+ assertEquals(2, old);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReplace() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertTrue(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertFalse(replace);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertTrue(replace);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2);
+
+ assertTrue(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictGetAndReplace() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertNull(old);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertEquals(1, old);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertNull(old);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertNull(old);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertEquals(3, old);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object old = cache.getAndReplace(key, 2);
+
+ assertEquals(1, old);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRemoveWithOldValue() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 2);
+
+ assertFalse(rmv);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 1);
+
+ assertTrue(rmv);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 2);
+
+ assertFalse(rmv);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 2);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 2);
+
+ assertTrue(rmv);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 3);
+
+ assertTrue(rmv);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 2);
+
+ assertFalse(rmv);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean rmv = cache.remove(key, 1);
+
+ assertTrue(rmv);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictCasReplace() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 1, 2);
+
+ assertFalse(replace);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 1, 2);
+
+ assertTrue(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 1, 2);
+
+ assertFalse(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 2);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2, 1);
+
+ assertTrue(replace);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, 3, cache.getName());
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 3, 4);
+
+ assertTrue(replace);
+
+ txAsync(cache, OPTIMISTIC, SERIALIZABLE,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 2, 3);
+
+ assertFalse(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean replace = cache.replace(key, 1, 3);
+
+ assertTrue(replace);
+
+ tx.commit();
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRemoveReturnBoolean1() throws Exception {
+ txConflictRemoveReturnBoolean(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictRemoveReturnBoolean2() throws Exception {
+ txConflictRemoveReturnBoolean(true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when do update in tx.
+ * @throws Exception If failed.
+ */
+ private void txConflictRemoveReturnBoolean(boolean noVal) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (final Integer key : keys) {
+ log.info("Test key: " + key);
+
+ if (!noVal)
+ cache.put(key, -1);
+
+ if (noVal) {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.remove(key);
+
+ assertFalse(res);
+
+ updateKey(cache, key, -1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, -1, cache.getName());
+ }
+ else {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.remove(key);
+
+ assertTrue(res);
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, -1);
+ }
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.remove(key);
+
+ assertTrue(res);
+
+ updateKey(cache, key, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ // Check no conflict for removeAll with single key.
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.removeAll(Collections.singleton(key));
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ cache.remove(key);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ cache.put(key, 2);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.remove(key);
+
+ assertTrue(res);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.remove(key);
+
+ assertFalse(res);
+
+ tx.commit();
+ }
+
+ checkValue(key, null, cache.getName());
+
+ try {
+ cache.put(key, 1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val = cache.get(key);
+
+ assertEquals(1, val);
+
+ boolean res = cache.remove(key);
+
+ assertTrue(res);
+
+ updateKey(cache, key, 2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictPut1() throws Exception {
+ txNoConflictUpdate(true, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictPut2() throws Exception {
+ txNoConflictUpdate(false, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictPut3() throws Exception {
+ txNoConflictUpdate(false, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictRemove1() throws Exception {
+ txNoConflictUpdate(true, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictRemove2() throws Exception {
+ txNoConflictUpdate(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictRemove3() throws Exception {
+ txNoConflictUpdate(false, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ * @param noVal If {@code true} there is no cache value when do update in tx.
+ * @param rmv If {@code true} tests remove, otherwise put.
+ * @param getAfterUpdate If {@code true} tries to get value in tx after update.
+ */
+ private void txNoConflictUpdate(boolean noVal, boolean rmv, boolean getAfterUpdate) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ if (!noVal)
+ cache.put(key, -1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ if (rmv)
+ cache.remove(key);
+ else
+ cache.put(key, 2);
+
+ if (getAfterUpdate) {
+ Object val = cache.get(key);
+
+ if (rmv)
+ assertNull(val);
+ else
+ assertEquals(2, val);
+ }
+
+ if (!rmv)
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ checkValue(key, rmv ? null : 2, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 3);
+
+ tx.commit();
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, i);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ if (rmv)
+ cache.removeAll(map.keySet());
+ else
+ cache.putAll(map);
+
+ if (getAfterUpdate) {
+ Map<Integer, Integer> res = cache.getAll(map.keySet());
+
+ if (rmv) {
+ for (Integer key : map.keySet())
+ assertNull(res.get(key));
+ }
+ else {
+ for (Integer key : map.keySet())
+ assertEquals(map.get(key), res.get(key));
+ }
+ }
+
+ txAsync(cache, PESSIMISTIC, REPEATABLE_READ,
+ new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
+ Map<Integer, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 100; i++)
+ map.put(i, -1);
+
+ cache.putAll(map);
+
+ return null;
+ }
+ }
+ );
+
+ tx.commit();
+ }
+
+ for (int i = 0; i < 100; i++)
+ checkValue(i, rmv ? null : i, cache.getName());
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictContainsKey1() throws Exception {
+ txNoConflictContainsKey(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxNoConflictContainsKey2() throws Exception {
+ txNoConflictContainsKey(true);
+ }
+
+ /**
+ * @param noVal If {@code true} there is no cache value when do update in tx.
+ * @throws Exception If failed.
+ */
+ private void txNoConflictContainsKey(boolean noVal) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ if (!noVal)
+ cache.put(key, -1);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.containsKey(key);
+
+ assertEquals(!noVal, res);
+
+ updateKey(cache, key, 1);
+
+ tx.commit();
+ }
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.containsKey(key);
+
+ assertTrue(res);
+
+ updateKey(cache, key, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.containsKey(key);
+
+ assertTrue(res);
+
+ tx.commit();
+ }
+
+ cache.remove(key);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ boolean res = cache.containsKey(key);
+
+ assertFalse(res);
+
+ updateKey(cache, key, 3);
+
+ tx.commit();
+ }
+
+ checkValue(key, 3, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxRollbackIfLocked1() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> keys = testKeys(cache);
+
+ for (Integer key : keys) {
+ log.info("Test key: " + key);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = lockKey(latch, cache, key);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 2);
+
+ log.info("Commit");
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ fut.get();
+
+ checkValue(key, 1, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key, 2, cache.getName());
+ }
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxRollbackIfLocked2() throws Exception {
+ rollbackIfLockedPartialLock(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxRollbackIfLocked3() throws Exception {
+ rollbackIfLockedPartialLock(true);
+ }
+
+ /**
+ * @param locKey If {@code true} gets lock for local key.
+ * @throws Exception If failed.
+ */
+ private void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ final Integer key1 = primaryKey(ignite(1).cache(cache.getName()));
+ final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName()));
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = lockKey(latch, cache, key1);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, 2);
+ cache.put(key2, 2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ fut.get();
+
+ checkValue(key1, 1, cache.getName());
+ checkValue(key2, null, cache.getName());
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, 2);
+ cache.put(key2, 2);
+
+ tx.commit();
+ }
+
+ checkValue(key1, 2, cache.getName());
+ checkValue(key2, 2, cache.getName());
+ }
+ finally {
+ destroyCache(ignite0, ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearCacheReaderUpdate() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteCache<Integer, Integer> cache0 =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+ final String cacheName = cache0.getName();
+
+ try {
+ Ignite client1 = ignite(SRVS);
+ Ignite client2 = ignite(SRVS + 1);
+
+ IgniteCache<Integer, Integer> cache1 = client1.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+ IgniteCache<Integer, Integer> cache2 = client2.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+
+ Integer key = primaryKey(ignite(0).cache(cacheName));
+
+ try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ assertNull(cache1.get(key));
+ cache1.put(key, 1);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client2.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ assertEquals(1, (Object) cache2.get(key));
+ cache2.put(key, 2);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = client1.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+ assertEquals(2, (Object)cache1.get(key));
+ cache1.put(key, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache1() throws Exception {
+ rollbackNearCacheWrite(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache2() throws Exception {
+ rollbackNearCacheWrite(false);
+ }
+
+ /**
+ * @param near If {@code true} locks entry using the same near cache.
+ * @throws Exception If failed.
+ */
+ private void rollbackNearCacheWrite(boolean near) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteCache<Integer, Integer> cache0 =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+ final String cacheName = cache0.getName();
+
+ try {
+ Ignite ignite = ignite(SRVS);
+
+ IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Integer key1 = primaryKey(ignite(0).cache(cacheName));
+ Integer key2 = primaryKey(ignite(1).cache(cacheName));
+ Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = null;
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ fut = lockKey(latch, near ? cache : cache0, key2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ assert fut != null;
+
+ fut.get();
+
+ checkValue(key1, null, cacheName);
+ checkValue(key2, 1, cacheName);
+ checkValue(key3, null, cacheName);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ tx.commit();
+ }
+
+ checkValue(key1, key1, cacheName);
+ checkValue(key2, key2, cacheName);
+ checkValue(key3, key3, cacheName);
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache3() throws Exception {
+ rollbackNearCacheRead(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache4() throws Exception {
+ rollbackNearCacheRead(false);
+ }
+
+ /**
+ * @param near If {@code true} updates entry using the same near cache.
+ * @throws Exception If failed.
+ */
+ private void rollbackNearCacheRead(boolean near) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteCache<Integer, Integer> cache0 =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+ final String cacheName = cache0.getName();
+
+ try {
+ Ignite ignite = ignite(SRVS);
+
+ IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Integer key1 = primaryKey(ignite(0).cache(cacheName));
+ Integer key2 = primaryKey(ignite(1).cache(cacheName));
+ Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+ cache0.put(key1, -1);
+ cache0.put(key2, -1);
+ cache0.put(key3, -1);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.get(key1);
+ cache.get(key2);
+ cache.get(key3);
+
+ updateKey(near ? cache : cache0, key2, -2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key1, -1, cacheName);
+ checkValue(key2, -2, cacheName);
+ checkValue(key3, -1, cacheName);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ tx.commit();
+ }
+
+ checkValue(key1, key1, cacheName);
+ checkValue(key2, key2, cacheName);
+ checkValue(key3, key3, cacheName);
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTx() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final String CACHE1 = "cache1";
+ final String CACHE2 = "cache2";
+
+ try {
+ CacheConfiguration<Integer, Integer> ccfg1 =
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
+
+ ccfg1.setName(CACHE1);
+
+ ignite0.createCache(ccfg1);
+
+ CacheConfiguration<Integer, Integer> ccfg2=
+ cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
+
+ ccfg2.setName(CACHE2);
+
+ ignite0.createCache(ccfg2);
+
+ Integer newVal = 0;
+
+ List<Integer> keys = testKeys(ignite0.<Integer, Integer>cache(CACHE1));
+
+ for (Ignite ignite : G.allGrids()) {
+ log.info("Test node: " + ignite.name());
+
+ IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1);
+ IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2);
+
+ IgniteTransactions txs = ignite.transactions();
+
+ for (Integer key : keys) {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key, newVal);
+ cache2.put(key, newVal);
+
+ tx.commit();
+ }
+
+ checkValue(key, newVal, CACHE1);
+ checkValue(key, newVal, CACHE2);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val1 = cache1.get(key);
+ Object val2 = cache2.get(key);
+
+ assertEquals(newVal, val1);
+ assertEquals(newVal, val2);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key, newVal + 1);
+ cache2.put(key, newVal + 1);
+
+ tx.rollback();
+ }
+
+ checkValue(key, newVal, CACHE1);
+ checkValue(key, newVal, CACHE2);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val1 = cache1.get(key);
+ Object val2 = cache2.get(key);
+
+ assertEquals(newVal, val1);
+ assertEquals(newVal, val2);
+
+ cache1.put(key, newVal + 1);
+ cache2.put(key, newVal + 1);
+
+ tx.commit();
+ }
+
+ newVal++;
+
+ checkValue(key, newVal, CACHE1);
+ checkValue(key, newVal, CACHE2);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key, newVal);
+ cache2.put(-key, newVal);
+
+ tx.commit();
+ }
+
+ checkValue(key, newVal, CACHE1);
+ checkValue(-key, null, CACHE1);
+
+ checkValue(key, newVal, CACHE2);
+ checkValue(-key, newVal, CACHE2);
+ }
+
+ newVal++;
+
+ Integer key1 = primaryKey(ignite(0).cache(CACHE1));
+ Integer key2 = primaryKey(ignite(1).cache(CACHE1));
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key1, newVal);
+ cache1.put(key2, newVal);
+
+ cache2.put(key1, newVal);
+ cache2.put(key2, newVal);
+
+ tx.commit();
+ }
+
+ checkValue(key1, newVal, CACHE1);
+ checkValue(key2, newVal, CACHE1);
+ checkValue(key1, newVal, CACHE2);
+ checkValue(key2, newVal, CACHE2);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = lockKey(latch, cache1, key1);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key1, newVal + 1);
+ cache2.put(key1, newVal + 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ fut.get();
+
+ checkValue(key1, 1, CACHE1);
+ checkValue(key1, newVal, CACHE2);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key1, newVal + 1);
+ cache2.put(key1, newVal + 1);
+
+ tx.commit();
+ }
+
+ newVal++;
+
+ cache1.put(key2, newVal);
+ cache2.put(key2, newVal);
+
+ checkValue(key1, newVal, CACHE1);
+ checkValue(key1, newVal, CACHE2);
+
+ latch = new CountDownLatch(1);
+
+ fut = lockKey(latch, cache1, key1);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache1.put(key1, newVal + 1);
+ cache2.put(key2, newVal + 1);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ fut.get();
+
+ checkValue(key1, 1, CACHE1);
+ checkValue(key2, newVal, CACHE2);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Object val1 = cache1.get(key1);
+
<TRUNCATED>