You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/22 00:04:09 UTC
[16/46] incubator-ignite git commit: GG-9141 - Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
new file mode 100644
index 0000000..58c3333
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java
@@ -0,0 +1,484 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.affinity.*;
+import org.gridgain.grid.kernal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+abstract class IgniteTxAbstractTest extends GridCommonAbstractTest {
+ /** Random number generator. */
+ private static final Random RAND = new Random();
+
+ /** Execution count. */
+ private static final AtomicInteger cntr = new AtomicInteger();
+
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /**
+ * Start grid by default.
+ */
+ protected IgniteTxAbstractTest() {
+ super(false /*start grid. */);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(disco);
+
+ return c;
+ }
+
+ /**
+ * @return Grid count.
+ */
+ protected abstract int gridCount();
+
+ /**
+ * @return Key count.
+ */
+ protected abstract int keyCount();
+
+ /**
+ * @return Maximum key value.
+ */
+ protected abstract int maxKeyValue();
+
+ /**
+ * @return Thread iterations.
+ */
+ protected abstract int iterations();
+
+ /**
+ * @return True if in-test logging is enabled.
+ */
+ protected abstract boolean isTestDebug();
+
+ /**
+ * @return {@code True} if memory stats should be printed.
+ */
+ protected abstract boolean printMemoryStats();
+
+ /** {@inheritDoc} */
+ private void debug(String msg) {
+ if (isTestDebug())
+ info(msg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 0; i < gridCount(); i++)
+ startGrid(i);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @param i Grid index.
+ * @return Cache.
+ */
+ @SuppressWarnings("unchecked")
+ @Override protected GridCache<Integer, String> cache(int i) {
+ return grid(i).cache(null);
+ }
+
+ /**
+ * @return Keys.
+ */
+ protected Iterable<Integer> getKeys() {
+ List<Integer> keys = new ArrayList<>(keyCount());
+
+ for (int i = 0; i < keyCount(); i++)
+ keys.add(RAND.nextInt(maxKeyValue()) + 1);
+
+ Collections.sort(keys);
+
+ return Collections.unmodifiableList(keys);
+ }
+
+ /**
+ * @return Random cache operation.
+ */
+ protected OP getOp() {
+ switch (RAND.nextInt(3)) {
+ case 0: { return OP.READ; }
+ case 1: { return OP.WRITE; }
+ case 2: { return OP.REMOVE; }
+
+ // Should never be reached.
+ default: { assert false; return null; }
+ }
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If check failed.
+ */
+ protected void checkCommit(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation) throws Exception {
+ int gridIdx = RAND.nextInt(gridCount());
+
+ Ignite ignite = grid(gridIdx);
+
+ if (isTestDebug())
+ debug("Checking commit on grid: " + ignite.cluster().localNode().id());
+
+ for (int i = 0; i < iterations(); i++) {
+ GridCache<Integer, String> cache = cache(gridIdx);
+
+ IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+ try {
+ int prevKey = -1;
+
+ for (Integer key : getKeys()) {
+ // Make sure we have the same locking order for all concurrent transactions.
+ assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey;
+
+ if (isTestDebug()) {
+ GridCacheAffinityFunction aff = cache.configuration().getAffinity();
+
+ int part = aff.partition(key);
+
+ debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" +
+ U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+ }
+
+ String val = Integer.toString(key);
+
+ switch (getOp()) {
+ case READ: {
+ if (isTestDebug())
+ debug("Reading key [key=" + key + ", i=" + i + ']');
+
+ val = cache.get(key);
+
+ if (isTestDebug())
+ debug("Read value for key [key=" + key + ", val=" + val + ']');
+
+ break;
+ }
+
+ case WRITE: {
+ if (isTestDebug())
+ debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']');
+
+ cache.put(key, val);
+
+ break;
+ }
+
+ case REMOVE: {
+ if (isTestDebug())
+ debug("Removing key [key=" + key + ", i=" + i + ']');
+
+ cache.remove(key);
+
+ break;
+ }
+
+ default: { assert false; }
+ }
+ }
+
+ tx.commit();
+
+ if (isTestDebug())
+ debug("Committed transaction [i=" + i + ", tx=" + tx + ']');
+ }
+ catch (GridCacheTxOptimisticException e) {
+ if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) {
+ error("Received invalid optimistic failure.", e);
+
+ throw e;
+ }
+
+ if (isTestDebug())
+ info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() +
+ ", tx=" + tx.xid() + ']');
+
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ error("Failed to rollback optimistic failure: " + tx, ex);
+
+ throw ex;
+ }
+ }
+ catch (Exception e) {
+ error("Transaction failed (will rollback): " + tx, e);
+
+ tx.rollback();
+
+ throw e;
+ }
+ catch (Error e) {
+ error("Error when executing transaction (will rollback): " + tx, e);
+
+ tx.rollback();
+
+ throw e;
+ }
+ finally {
+ IgniteTx t = cache.tx();
+
+ assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) +
+ ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']';
+ }
+ }
+
+ if (printMemoryStats()) {
+ if (cntr.getAndIncrement() % 100 == 0)
+ // Print transaction memory stats.
+ ((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats();
+ }
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws IgniteCheckedException If check failed.
+ */
+ protected void checkRollback(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation)
+ throws Exception {
+ checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation);
+ }
+
+ /**
+ * @param map Map to check.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws IgniteCheckedException If check failed.
+ */
+ protected void checkRollback(ConcurrentMap<Integer, String> map, GridCacheTxConcurrency concurrency,
+ GridCacheTxIsolation isolation) throws Exception {
+ int gridIdx = RAND.nextInt(gridCount());
+
+ Ignite ignite = grid(gridIdx);
+
+ if (isTestDebug())
+ debug("Checking commit on grid: " + ignite.cluster().localNode().id());
+
+ for (int i = 0; i < iterations(); i++) {
+ GridCache<Integer, String> cache = cache(gridIdx);
+
+ IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0);
+
+ try {
+ for (Integer key : getKeys()) {
+ if (isTestDebug()) {
+ GridCacheAffinityFunction aff = cache.configuration().getAffinity();
+
+ int part = aff.partition(key);
+
+ debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" +
+ U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']');
+ }
+
+ String val = Integer.toString(key);
+
+ switch (getOp()) {
+ case READ: {
+ debug("Reading key: " + key);
+
+ checkMap(map, key, cache.get(key));
+
+ break;
+ }
+
+ case WRITE: {
+ debug("Writing key and value [key=" + key + ", val=" + val + ']');
+
+ checkMap(map, key, cache.put(key, val));
+
+ break;
+ }
+
+ case REMOVE: {
+ debug("Removing key: " + key);
+
+ checkMap(map, key, cache.remove(key));
+
+ break;
+ }
+
+ default: { assert false; }
+ }
+ }
+
+ tx.rollback();
+
+ debug("Rolled back transaction: " + tx);
+ }
+ catch (GridCacheTxOptimisticException e) {
+ tx.rollback();
+
+ log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']');
+
+ throw e;
+ }
+ catch (Exception e) {
+ tx.rollback();
+
+ error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']');
+
+ throw e;
+ }
+ finally {
+ IgniteTx t1 = cache.tx();
+
+ debug("t1=" + t1);
+
+ assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) +
+ ", t=" + t1 + ']';
+ }
+ }
+ }
+
+ /**
+ * @param map Map to check against.
+ * @param key Key.
+ * @param val Value.
+ */
+ private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) {
+ if (val != null) {
+ String v = map.putIfAbsent(key, val);
+
+ assert v == null || v.equals(val);
+ }
+ }
+
+ /**
+ * Checks integrity of all caches after tests.
+ *
+ * @throws IgniteCheckedException If check failed.
+ */
+ @SuppressWarnings({"ErrorNotRethrown"})
+ protected void finalChecks() throws Exception {
+ for (int i = 1; i <= maxKeyValue(); i++) {
+ for (int k = 0; k < 3; k++) {
+ try {
+ GridCacheEntry<Integer, String> e1 = null;
+
+ String v1 = null;
+
+ for (int j = 0; j < gridCount(); j++) {
+ GridCache<Integer, String> cache = cache(j);
+
+ IgniteTx tx = cache.tx();
+
+ assertNull("Transaction is not completed: " + tx, tx);
+
+ if (j == 0) {
+ e1 = cache.entry(i);
+
+ v1 = e1.get();
+ }
+ else {
+ GridCacheEntry<Integer, String> e2 = cache.entry(i);
+
+ String v2 = e2.get();
+
+ if (!F.eq(v2, v1)) {
+ v1 = e1.get();
+ v2 = e2.get();
+ }
+
+ assert F.eq(v2, v1) :
+ "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", e1=" + e1 +
+ ", e2=" + e2 + ", grid=" + j + ']';
+ }
+ }
+
+ break;
+ }
+ catch (AssertionError e) {
+ if (k == 2)
+ throw e;
+ else
+ // Wait for transactions to complete.
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ for (int i = 1; i <= maxKeyValue(); i++) {
+ for (int k = 0; k < 3; k++) {
+ try {
+ for (int j = 0; j < gridCount(); j++) {
+ GridCacheProjection<Integer, String> cache = cache(j);
+
+ cache.removeAll();
+
+// assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet();
+ }
+
+ break;
+ }
+ catch (AssertionError e) {
+ if (k == 2)
+ throw e;
+ else
+ // Wait for transactions to complete.
+ Thread.sleep(500);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cache operation.
+ */
+ protected enum OP {
+ /** Cache read. */
+ READ,
+
+ /** Cache write. */
+ WRITE,
+
+ /** Cache remove. */
+ REMOVE
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
new file mode 100644
index 0000000..9548cd0
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java
@@ -0,0 +1,134 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Checks multithreaded put/get cache operations on one node.
+ */
+public abstract class IgniteTxConcurrentGetAbstractTest extends GridCommonAbstractTest {
+ /** Debug flag. */
+ private static final boolean DEBUG = false;
+
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int THREAD_NUM = 20;
+
+ /**
+ * Default constructor.
+ *
+ */
+ protected IgniteTxConcurrentGetAbstractTest() {
+ super(true /** Start grid. */);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+
+ return cfg;
+ }
+
+ /**
+ * @param g Grid.
+ * @return Near cache.
+ */
+ GridNearCacheAdapter<String, Integer> near(Ignite g) {
+ return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, Integer>internalCache();
+ }
+
+ /**
+ * @param g Grid.
+ * @return DHT cache.
+ */
+ GridDhtCacheAdapter<String, Integer> dht(Ignite g) {
+ return near(g).dht();
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutGet() throws Exception {
+ // Random key.
+ final String key = UUID.randomUUID().toString();
+
+ final Ignite ignite = grid();
+
+ ignite.cache(null).put(key, "val");
+
+ GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key);
+
+ if (DEBUG)
+ info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", entry=" + dhtEntry + ']');
+
+ String val = txGet(ignite, key);
+
+ assertNotNull(val);
+
+ info("Starting threads: " + THREAD_NUM);
+
+ multithreaded(new Callable<String>() {
+ @Override public String call() throws Exception {
+ return txGet(ignite, key);
+ }
+ }, THREAD_NUM, "getter-thread");
+ }
+
+ /**
+ * @param ignite Grid.
+ * @param key Key.
+ * @return Value.
+ * @throws Exception If failed.
+ */
+ private String txGet(Ignite ignite, String key) throws Exception {
+ try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheEntryEx<String, Integer> dhtEntry = dht(ignite).peekEx(key);
+
+ if (DEBUG)
+ info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", xid=" + tx.xid() +
+ ", entry=" + dhtEntry + ']');
+
+ String val = ignite.<String, String>cache(null).get(key);
+
+ assertNotNull(val);
+ assertEquals("val", val);
+
+ tx.commit();
+
+ return val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
new file mode 100644
index 0000000..7b39975
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -0,0 +1,631 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.indexing.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ * Tests that transaction is invalidated in case of {@link GridCacheTxHeuristicException}.
+ */
+public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstractSelfTest {
+ /** Index SPI throwing exception. */
+ private static TestIndexingSpi idxSpi = new TestIndexingSpi();
+
+ /** */
+ private static final int PRIMARY = 0;
+
+ /** */
+ private static final int BACKUP = 1;
+
+ /** */
+ private static final int NOT_PRIMARY_AND_BACKUP = 2;
+
+ /** */
+ private static Integer lastKey;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setIndexingSpi(idxSpi);
+
+ cfg.getTransactionsConfiguration().setTxSerializableEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+ ccfg.setQueryIndexEnabled(true);
+ ccfg.setStore(null);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ lastKey = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ idxSpi.forceFail(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutNear() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutPrimary() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPut(false, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackup() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkPut(false, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAll() throws Exception {
+ checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ if (gridCount() > 1) {
+ checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+
+ checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveNear() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemovePrimary() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveBackup() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformNear() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformPrimary() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformBackup() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutNearTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutPrimaryTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackupTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutMultipleKeysTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation,
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation,
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ if (gridCount() > 1) {
+ checkPutTx(true, concurrency, isolation,
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation,
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param keys Keys.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkPutTx(boolean putBefore, GridCacheTxConcurrency concurrency,
+ GridCacheTxIsolation isolation, final Integer... keys) throws Exception {
+ assertTrue(keys.length > 0);
+
+ info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+ if (putBefore) {
+ idxSpi.forceFail(false);
+
+ info("Start transaction.");
+
+ try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+ for (Integer key : keys) {
+ info("Put " + key);
+
+ cache.put(key, 1);
+ }
+
+ info("Commit.");
+
+ tx.commit();
+ }
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ grid(i).cache(null).get(key);
+ }
+
+ idxSpi.forceFail(true);
+
+ try {
+ info("Start transaction.");
+
+ try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+ for (Integer key : keys) {
+ info("Put " + key);
+
+ cache.put(key, 2);
+ }
+
+ info("Commit.");
+
+ tx.commit();
+ }
+
+ fail("Transaction should fail.");
+ }
+ catch (GridCacheTxHeuristicException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ for (Integer key : keys)
+ checkEmpty(key);
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkEmpty(final Integer key) throws Exception {
+ idxSpi.forceFail(false);
+
+ info("Check key: " + key);
+
+ for (int i = 0; i < gridCount(); i++) {
+ GridKernal grid = (GridKernal) grid(i);
+
+ GridCacheAdapter cache = grid.internalCache(null);
+
+ GridCacheMapEntry entry = cache.map().getEntry(key);
+
+ log.info("Entry: " + entry);
+
+ if (entry != null) {
+ assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+ assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue());
+ }
+
+ if (cache.isNear()) {
+ entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key);
+
+ log.info("Dht entry: " + entry);
+
+ if (entry != null) {
+ assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+ assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue());
+ }
+ }
+ }
+
+ for (int i = 0; i < gridCount(); i++)
+ assertEquals("Unexpected value for grid " + i, null, grid(i).cache(null).get(key));
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkPut(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ idxSpi.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ idxSpi.forceFail(true);
+
+ info("Going to put: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).put(key, 2);
+
+ return null;
+ }
+ }, GridCacheTxHeuristicException.class, null);
+
+ checkEmpty(key);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkTransform(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ idxSpi.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ idxSpi.forceFail(true);
+
+ info("Going to transform: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
+ @Override public Object apply(Object o) {
+ return 2;
+ }
+ });
+
+ return null;
+ }
+ }, GridCacheTxHeuristicException.class, null);
+
+ checkEmpty(key);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param keys Keys.
+ * @throws Exception If failed.
+ */
+ private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception {
+ assert keys.length > 1;
+
+ if (putBefore) {
+ idxSpi.forceFail(false);
+
+ Map<Integer, Integer> m = new HashMap<>();
+
+ for (Integer key : keys)
+ m.put(key, 1);
+
+ info("Put data: " + m);
+
+ grid(0).cache(null).putAll(m);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ grid(i).cache(null).get(key);
+ }
+
+ idxSpi.forceFail(true);
+
+ final Map<Integer, Integer> m = new HashMap<>();
+
+ for (Integer key : keys)
+ m.put(key, 2);
+
+ info("Going to putAll: " + m);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).putAll(m);
+
+ return null;
+ }
+ }, GridCacheTxHeuristicException.class, null);
+
+ for (Integer key : m.keySet())
+ checkEmpty(key);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkRemove(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ idxSpi.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ idxSpi.forceFail(true);
+
+ info("Going to remove: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).remove(key);
+
+ return null;
+ }
+ }, GridCacheTxHeuristicException.class, null);
+
+ checkEmpty(key);
+ }
+
+ /**
+ * Generates key of a given type for given node.
+ *
+ * @param node Node.
+ * @param type Key type.
+ * @return Key.
+ */
+ private Integer keyForNode(ClusterNode node, int type) {
+ GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+ if (cache.configuration().getCacheMode() == LOCAL)
+ return ++lastKey;
+
+ if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP)
+ return ++lastKey;
+
+ for (int key = lastKey + 1; key < (lastKey + 10_000); key++) {
+ switch (type) {
+ case NOT_PRIMARY_AND_BACKUP: {
+ if (!cache.affinity().isPrimaryOrBackup(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ case PRIMARY: {
+ if (cache.affinity().isPrimary(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ case BACKUP: {
+ if (cache.affinity().isBackup(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ default:
+ fail();
+ }
+ }
+
+ throw new IllegalStateException("Failed to find key.");
+ }
+
+ /**
+ * Indexing SPI that can fail on demand.
+ */
+ private static class TestIndexingSpi extends IgniteSpiAdapter implements GridIndexingSpi {
+ /** Fail flag. */
+ private volatile boolean fail;
+
+ /**
+ * @param fail Fail flag.
+ */
+ public void forceFail(boolean fail) {
+ this.fail = fail;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> query(@Nullable String spaceName, Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws IgniteSpiException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime)
+ throws IgniteSpiException {
+ if (fail) {
+ fail = false;
+
+ throw new IgniteSpiException("Test exception.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(@Nullable String spaceName, Object k)
+ throws IgniteSpiException {
+ if (fail) {
+ fail = false;
+
+ throw new IgniteSpiException("Test exception.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
new file mode 100644
index 0000000..4822742
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java
@@ -0,0 +1,918 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.affinity.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+
+/**
+ * Checks basic multi-node transactional operations.
+ */
+@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions", "PointlessArithmeticExpression"})
+public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTest {
+ /** Debug flag. */
+ private static final boolean DEBUG = false;
+
+ /** */
+ protected static final int GRID_CNT = 4;
+
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ protected static final int RETRIES = 300;
+
+ /** Log frequency. */
+ private static final int LOG_FREQ = RETRIES < 100 || DEBUG ? 1 : RETRIES / 5;
+
+ /** Counter key. */
+ private static final String CNTR_KEY = "CNTR_KEY";
+
+ /** Removed counter key. */
+ private static final String RMVD_CNTR_KEY = "RMVD_CNTR_KEY";
+
+ /** */
+ protected static final AtomicInteger cntr = new AtomicInteger();
+
+ /** */
+ protected static final AtomicInteger cntrRmvd = new AtomicInteger();
+
+ /** Number of backups for partitioned tests. */
+ protected int backups = 2;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ backups = 0;
+
+ cntr.set(0);
+ }
+
+ /**
+ *
+ * @param ignite Grid
+ * @param key Key.
+ * @return Primary node id.
+ */
+ @SuppressWarnings("unchecked")
+ private static UUID primaryId(Ignite ignite, Object key) {
+ GridCacheAffinity aff = ignite.cache(null).cache().affinity();
+
+ Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(key));
+
+ ClusterNode first = F.first(affNodes);
+
+ assert first != null;
+
+ return first.id();
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param key Key.
+ * @return DHT entry.
+ */
+ @Nullable private static GridCacheEntryEx<Object, Integer> dhtEntry(UUID nodeId, Object key) {
+ Ignite g = G.ignite(nodeId);
+
+ GridDhtCacheAdapter<Object, Integer> dht =
+ ((GridKernal)g).<Object, Integer>internalCache().context().near().dht();
+
+ return dht.peekEx(key);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param key Key.
+ * @return Near entry.
+ */
+ @Nullable private static GridCacheEntryEx<Object, Integer> nearEntry(UUID nodeId, Object key) {
+ Ignite g = G.ignite(nodeId);
+
+ GridNearCacheAdapter<Object, Integer> near = ((GridKernal)g).<Object, Integer>internalCache().context().near();
+
+ return near.peekEx(key);
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @param itemKey Item key.
+ * @param retry Retry count.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void onItemNear(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException {
+ GridCache<String, Integer> cache = ignite.cache(null);
+
+ UUID locId = ignite.cluster().localNode().id();
+ UUID itemPrimaryId = primaryId(ignite, itemKey);
+ UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY);
+
+ boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (DEBUG)
+ info("Before near get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+ ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+ ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+ Integer cntr = cache.get(CNTR_KEY);
+
+ int newVal = cntr + 1;
+
+ if (putCntr) {
+ if (DEBUG)
+ info("Before near put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+ cache.putx(CNTR_KEY, newVal);
+ }
+
+ if (DEBUG)
+ info("Before near put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + ", new=" + newVal +
+ ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ cache.putx(itemKey, newVal);
+
+ if (DEBUG)
+ info("After near put item [retry=" + retry + ", key=" + itemKey + ", old=" + cntr + ", new=" + newVal +
+ ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ tx.commit();
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @param itemKey Item key.
+ * @param retry Retry count.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void onItemPrimary(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException {
+ GridCache<String, Integer> cache = ignite.cache(null);
+
+ UUID locId = ignite.cluster().localNode().id();
+ UUID itemPrimaryId = primaryId(ignite, itemKey);
+ UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY);
+
+ boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (DEBUG)
+ info("Before item primary get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+ ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+ ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+ Integer cntr = cache.get(CNTR_KEY);
+
+ int newVal = cntr + 1;
+
+ if (putCntr) {
+ if (DEBUG)
+ info("Before item primary put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']');
+
+ cache.putx(CNTR_KEY, newVal);
+ }
+
+ if (DEBUG)
+ info("Before item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ cache.putx(itemKey, cntr);
+
+ if (DEBUG)
+ info("After item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ tx.commit();
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @param retry Retry count.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void onRemoveItemQueried(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException {
+ GridCache<String, Integer> cache = ignite.cache(null);
+
+ UUID locId = ignite.cluster().localNode().id();
+ UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY);
+
+ boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (DEBUG)
+ ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+ ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+ ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+ Integer cntr = cache.get(RMVD_CNTR_KEY);
+
+ assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']';
+
+ int newVal = cntr - 1;
+
+ if (putCntr) {
+ if (DEBUG)
+ ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+ cache.putx(RMVD_CNTR_KEY, newVal);
+ }
+
+ while (true) {
+ GridCacheQuery<Map.Entry<String, Integer>> qry =
+ cache.queries().createSqlQuery(Integer.class, "_key != 'RMVD_CNTR_KEY' and _val >= 0");
+
+ if (DEBUG)
+ ignite.log().info("Before executing query [retry=" + retry + ", locId=" + locId +
+ ", txId=" + tx.xid() + ']');
+
+ Map.Entry<String, Integer> entry = qry.execute().next();
+
+ if (entry == null) {
+ ignite.log().info("*** Queue is empty.");
+
+ return;
+ }
+
+ String itemKey = entry.getKey();
+
+ UUID itemPrimaryId = primaryId(ignite, itemKey);
+
+ // Lock the item key.
+ if (cache.get(itemKey) != null) {
+ if (DEBUG)
+ ignite.log().info("Before item remove [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ assert cache.removex(itemKey) : "Failed to remove key [locId=" + locId +
+ ", primaryId=" + itemPrimaryId + ", key=" + itemKey + ']';
+
+ if (DEBUG)
+ info("After item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ break;
+ }
+ else
+ cache.removex(itemKey);
+ }
+
+ tx.commit();
+ }
+ catch (Error e) {
+ ignite.log().error("Error in test.", e);
+
+ throw e;
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @param retry Retry count.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void onRemoveItemSimple(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException {
+ GridCache<String, Integer> cache = ignite.cache(null);
+
+ UUID locId = ignite.cluster().localNode().id();
+ UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY);
+
+ boolean isCntrPrimary = cntrPrimaryId.equals(locId);
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ if (DEBUG)
+ ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() +
+ ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId +
+ ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+ Integer cntr = cache.get(RMVD_CNTR_KEY);
+
+ assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']';
+
+ String itemKey = Integer.toString(cntrRmvd.getAndIncrement());
+
+ Integer val = cache.get(itemKey);
+
+ assert val != null : "Received null val [retry=" + retry + ", cacheSize=" + cache.size() + ']';
+
+ UUID itemPrimaryId = primaryId(ignite, itemKey);
+
+ int newVal = cntr - 1;
+
+ if (putCntr) {
+ if (DEBUG)
+ ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary +
+ ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) +
+ (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']');
+
+ cache.putx(RMVD_CNTR_KEY, newVal);
+ }
+
+ if (DEBUG)
+ ignite.log().info("Before item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ assertTrue(cache.removex(itemKey));
+
+ if (DEBUG)
+ info("After item put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr +
+ ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) +
+ ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']');
+
+ tx.commit();
+ }
+ catch (Error e) {
+ ignite.log().error("Error in test.", e);
+
+ throw e;
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void retries(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+ UUID nodeId = ignite.cluster().localNode().id();
+
+ for (int i = 0; i < RETRIES; i++) {
+ int cnt = cntr.getAndIncrement();
+
+ if (DEBUG)
+ ignite.log().info("***");
+ if (DEBUG || cnt % LOG_FREQ == 0)
+ ignite.log().info("*** Iteration #" + i + " ***");
+ if (DEBUG)
+ ignite.log().info("***");
+
+ String itemKey = nodeId + "-#" + i;
+
+ if (nodeId.equals(primaryId(ignite, itemKey)))
+ onItemPrimary(putCntr, ignite, itemKey, i);
+ else
+ onItemNear(putCntr, ignite, itemKey, i);
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void removeRetriesQueried(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+ for (int i = 0; i < RETRIES; i++) {
+ if (DEBUG)
+ ignite.log().info("***");
+
+ if (DEBUG || cntrRmvd.getAndIncrement() % LOG_FREQ == 0)
+ ignite.log().info("*** Iteration #" + i + " ***");
+
+ if (DEBUG)
+ ignite.log().info("***");
+
+ onRemoveItemQueried(putCntr, ignite, i);
+
+ if (i % 50 == 0)
+ ((GridKernal) ignite).internalCache().context().tm().printMemoryStats();
+ }
+ }
+
+ /**
+ *
+ * @param putCntr Put counter to cache.
+ * @param ignite Grid.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void removeRetriesSimple(Ignite ignite, boolean putCntr) throws IgniteCheckedException {
+ for (int i = 0; i < RETRIES; i++) {
+ if (DEBUG)
+ ignite.log().info("***");
+
+ if (cntrRmvd.get() % LOG_FREQ == 0 || DEBUG)
+ ignite.log().info("*** Iteration #" + i + " ***");
+
+ if (DEBUG)
+ ignite.log().info("***");
+
+ onRemoveItemSimple(putCntr, ignite, i);
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutOneEntryInTx() throws Exception {
+// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+ startGrids(GRID_CNT);
+
+ try {
+ grid(0).cache(null).put(CNTR_KEY, 0);
+
+ grid(0).compute().call(new PutOneEntryInTxJob());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutTwoEntriesInTx() throws Exception {
+// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+ startGrids(GRID_CNT);
+
+ try {
+ grid(0).cache(null).put(CNTR_KEY, 0);
+
+ grid(0).compute().call(new PutTwoEntriesInTxJob());
+
+ printCounter();
+
+ assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutOneEntryInTxMultiThreaded() throws Exception {
+// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+ startGrids(GRID_CNT);
+
+ Collection<Thread> threads = new LinkedList<>();
+
+ try {
+ // Initialize.
+ grid(0).cache(null).put(CNTR_KEY, 0);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ final int gridId = i;
+
+ threads.add(new Thread("thread-#" + i) {
+ @Override public void run() {
+ try {
+ retries(grid(gridId), false);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ for (Thread th : threads)
+ th.start();
+
+ for (Thread th : threads)
+ th.join();
+
+ printCounter();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPutTwoEntryInTxMultiThreaded() throws Exception {
+// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName());
+
+ startGrids(GRID_CNT);
+
+ Collection<Thread> threads = new LinkedList<>();
+
+ try {
+ grid(0).cache(null).put(CNTR_KEY, 0);
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ final int gridId = i;
+
+ threads.add(new Thread() {
+ @Override public void run() {
+ try {
+ retries(grid(gridId), true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ for (Thread th : threads)
+ th.start();
+
+ for (Thread th : threads)
+ th.join();
+
+ printCounter();
+
+ assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRemoveInTxQueried() throws Exception {
+ //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName());
+
+ startGrids(GRID_CNT);
+
+ try {
+ GridCache<String, Integer> cache = grid(0).cache(null);
+
+ cache.put(RMVD_CNTR_KEY, 0);
+
+ for (int i = 0; i < GRID_CNT * RETRIES; i++)
+ cache.put(String.valueOf(i), i);
+
+ for (int i = 0; i < RETRIES; i++)
+ for (int j = 0; j < GRID_CNT; j++)
+ assertEquals(i, grid(j).cache(null).get(String.valueOf(i)));
+
+ GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0");
+
+ Collection<Map.Entry<String, Integer>> entries = qry.execute().get();
+
+ assertFalse(entries.isEmpty());
+
+ cntrRmvd.set(0);
+
+ grid(0).compute().call(new RemoveInTxJobQueried());
+
+ for (int i = 0; i < GRID_CNT * RETRIES; i++)
+ for (int ii = 0; ii < GRID_CNT; ii++)
+ assertEquals(null, grid(ii).cache(null).get(Integer.toString(i)));
+
+ assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRemoveInTxSimple() throws Exception {
+ startGrids(GRID_CNT);
+
+ try {
+ GridCache<String, Integer> cache = grid(0).cache(null);
+
+ cache.put(RMVD_CNTR_KEY, 0);
+
+ for (int i = 0; i < GRID_CNT * RETRIES; i++)
+ cache.put(Integer.toString(i), i);
+
+ for (int i = 0; i < RETRIES; i++)
+ for (int j = 0; j < GRID_CNT; j++)
+ assertEquals(i, grid(j).cache(null).get(Integer.toString(i)));
+
+ GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0");
+
+ Collection<Map.Entry<String, Integer>> entries = qry.execute().get();
+
+ assertFalse(entries.isEmpty());
+
+ cntrRmvd.set(0);
+
+ grid(0).compute().call(new RemoveInTxJobSimple());
+
+ // Check using cache.
+ for (int i = 0; i < GRID_CNT * RETRIES; i++)
+ for (int ii = 0; ii < GRID_CNT; ii++)
+ assertEquals(null, grid(ii).cache(null).get(Integer.toString(i)));
+
+ // Check using query.
+ entries = qry.execute().get();
+
+ assertTrue(entries.isEmpty());
+
+ assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * JUnit.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRemoveInTxQueriedMultiThreaded() throws Exception {
+ //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName());
+
+ backups = 1;
+
+ try {
+ startGrids(GRID_CNT);
+
+ GridCache<String, Integer> cache = grid(0).cache(null);
+
+ // Store counter.
+ cache.put(RMVD_CNTR_KEY, 0);
+
+ // Store values.
+ for (int i = 1; i <= GRID_CNT * RETRIES; i++)
+ cache.put(String.valueOf(i), i);
+
+ for (int j = 0; j < GRID_CNT; j++)
+ assertEquals(0, grid(j).cache(null).get(RMVD_CNTR_KEY));
+
+ for (int i = 1; i <= RETRIES; i++)
+ for (int j = 0; j < GRID_CNT; j++)
+ assertEquals(i, grid(j).cache(null).get(String.valueOf(i)));
+
+ GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0");
+
+ // Load all results.
+ qry.keepAll(true);
+ qry.includeBackups(false);
+
+ // NOTE: for replicated cache includeBackups(false) is not enough since
+ // all nodes are considered primary, so we have to deduplicate result set.
+ if (cache.configuration().getCacheMode() == REPLICATED)
+ qry.enableDedup(true);
+
+ List<Map.Entry<String, Integer>> entries =
+ new ArrayList<>(qry.execute().get());
+
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+ @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
+ return o1.getValue().compareTo(o2.getValue());
+ }
+ });
+
+ info("Queried entries: " + entries);
+
+ int val = 0;
+
+ for (Map.Entry<String, Integer> e : entries) {
+ assertEquals(val, e.getValue().intValue());
+
+ val++;
+ }
+
+ assertFalse(entries.isEmpty());
+
+ cntrRmvd.set(0);
+
+ Collection<Thread> threads = new LinkedList<>();
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ final int gridId = i;
+
+ threads.add(new Thread() {
+ @Override public void run() {
+ try {
+ removeRetriesQueried(grid(gridId), true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ for (Thread th : threads)
+ th.start();
+
+ for (Thread th : threads)
+ th.join();
+
+ for (int i = 0; i < GRID_CNT * RETRIES; i++)
+ for (int ii = 0; ii < GRID_CNT; ii++)
+ assertEquals("Got invalid value from cache [gridIdx=" + ii + ", key=" + i + ']',
+ null, grid(ii).cache(null).get(Integer.toString(i)));
+
+ assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void printCounter() throws IgniteCheckedException {
+ info("***");
+ info("*** Peeked counter: " + grid(0).cache(null).peek(CNTR_KEY));
+ info("*** Got counter: " + grid(0).cache(null).get(CNTR_KEY));
+ info("***");
+ }
+
+ /**
+ * Test job putting data to queue.
+ */
+ protected class PutTwoEntriesInTxJob implements IgniteCallable<Integer> {
+ /** */
+ @GridToStringExclude
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Integer call() throws IgniteCheckedException {
+ assertNotNull(ignite);
+
+ ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+ retries(ignite, true);
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PutTwoEntriesInTxJob.class, this);
+ }
+ }
+
+ /**
+ * Test job putting data to cache.
+ */
+ protected class PutOneEntryInTxJob implements IgniteCallable<Integer> {
+ /** */
+ @GridToStringExclude
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Integer call() throws IgniteCheckedException {
+ assertNotNull(ignite);
+
+ ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+ retries(ignite, false);
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PutOneEntryInTxJob.class, this);
+ }
+ }
+
+ /**
+ * Test job removing data from cache using query.
+ */
+ protected class RemoveInTxJobQueried implements IgniteCallable<Integer> {
+ /** */
+ @GridToStringExclude
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Integer call() throws IgniteCheckedException {
+ assertNotNull(ignite);
+
+ ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+ removeRetriesQueried(ignite, true);
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoveInTxJobQueried.class, this);
+ }
+ }
+
+ /**
+ * Test job removing data from cache.
+ */
+ protected class RemoveInTxJobSimple implements IgniteCallable<Integer> {
+ /** */
+ @GridToStringExclude
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Integer call() throws IgniteCheckedException {
+ assertNotNull(ignite);
+
+ ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]");
+
+ removeRetriesSimple(ignite, true);
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoveInTxJobSimple.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
new file mode 100644
index 0000000..efb7a26
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -0,0 +1,275 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstractTest {
+ /**
+ * @return Thread count.
+ */
+ protected abstract int threadCount();
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If check failed.
+ */
+ protected void checkCommitMultithreaded(final GridCacheTxConcurrency concurrency,
+ final GridCacheTxIsolation isolation) throws Exception {
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ Thread t = Thread.currentThread();
+
+ t.setName(t.getName() + "-id-" + t.getId());
+
+ info("Starting commit thread: " + Thread.currentThread().getName());
+
+ try {
+ checkCommit(concurrency, isolation);
+ }
+ finally {
+ info("Finished commit thread: " + Thread.currentThread().getName());
+ }
+
+ return null;
+ }
+ }, threadCount(), concurrency + "-" + isolation);
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws Exception If check failed.
+ */
+ protected void checkRollbackMultithreaded(final GridCacheTxConcurrency concurrency,
+ final GridCacheTxIsolation isolation) throws Exception {
+ final ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ Thread t = Thread.currentThread();
+
+ t.setName(t.getName() + "-id-" + t.getId());
+
+ info("Starting rollback thread: " + Thread.currentThread().getName());
+
+ try {
+ checkRollback(map, concurrency, isolation);
+
+ return null;
+ }
+ finally {
+ info("Finished rollback thread: " + Thread.currentThread().getName());
+ }
+ }
+ }, threadCount(), concurrency + "-" + isolation);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticReadCommittedCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(PESSIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticRepeatableReadCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(PESSIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticSerializableCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(PESSIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticReadCommittedCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(OPTIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticRepeatableReadCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(OPTIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticSerializableCommitMultithreaded() throws Exception {
+ checkCommitMultithreaded(OPTIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticReadCommittedRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(PESSIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticRepeatableReadRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(PESSIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticSerializableRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(PESSIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticReadCommittedRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(OPTIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticRepeatableReadRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(OPTIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticSerializableRollbackMultithreaded() throws Exception {
+ checkRollbackMultithreaded(OPTIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ // TODO: GG-8063, enabled when fixed.
+ public void _testOptimisticSerializableConsistency() throws Exception {
+ final GridCache<Integer, Long> cache = grid(0).cache(null);
+
+ final int THREADS = 2;
+
+ final int ITERATIONS = 100;
+
+ final int key = 0;
+
+ cache.put(key, 0L);
+
+ List<IgniteFuture<Collection<Long>>> futs = new ArrayList<>(THREADS);
+
+ for (int i = 0; i < THREADS; i++) {
+ futs.add(GridTestUtils.runAsync(new Callable<Collection<Long>>() {
+ @Override public Collection<Long> call() throws Exception {
+ Collection<Long> res = new ArrayList<>();
+
+ for (int i = 0; i < ITERATIONS; i++) {
+ while (true) {
+ try (IgniteTx tx = cache.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ long val = cache.get(key);
+
+ cache.put(key, val + 1);
+
+ tx.commit();
+
+ assertTrue(res.add(val + 1));
+
+ break;
+ }
+ catch(GridCacheTxOptimisticException e) {
+ log.info("Got error, will retry: " + e);
+ }
+ }
+ }
+
+ return res;
+ }
+ }));
+ }
+
+ List<Collection<Long>> cols = new ArrayList<>(THREADS);
+
+ for (IgniteFuture<Collection<Long>> fut : futs) {
+ Collection<Long> col = fut.get();
+
+ assertEquals(ITERATIONS, col.size());
+
+ cols.add(col);
+ }
+
+ Set<Long> duplicates = new HashSet<>();
+
+ for (Collection<Long> col1 : cols) {
+ for (Long val1 : col1) {
+ for (Collection<Long> col2 : cols) {
+ if (col1 == col2)
+ continue;
+
+ for (Long val2 : col2) {
+ if (val1.equals(val2)) {
+ duplicates.add(val2);
+
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
new file mode 100644
index 0000000..8decc56
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java
@@ -0,0 +1,169 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests reentry in pessimistic repeatable read tx.
+ */
+public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** @return Cache mode. */
+ protected abstract GridCacheMode cacheMode();
+
+ /** @return Near enabled. */
+ protected abstract boolean nearEnabled();
+
+ /** @return Grid count. */
+ protected abstract int gridCount();
+
+ /** @return Test key. */
+ protected abstract int testKey();
+
+ /** @return Expected number of near lock requests. */
+ protected abstract int expectedNearLockRequests();
+
+ /** @return Expected number of near lock requests. */
+ protected abstract int expectedDhtLockRequests();
+
+ /** @return Expected number of near lock requests. */
+ protected abstract int expectedDistributedLockRequests();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setCommunicationSpi(new CountingCommunicationSpi());
+ cfg.setDiscoverySpi(discoSpi);
+
+ GridCacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ return cfg;
+ }
+
+ /** @throws Exception If failed. */
+ public void testLockReentry() throws Exception {
+ startGrids(gridCount());
+
+ try {
+ GridCache<Object, Object> cache = grid(0).cache(null);
+
+ // Find test key.
+ int key = testKey();
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ // One near lock request.
+ cache.get(key);
+
+ // No more requests.
+ cache.remove(key);
+
+ tx.commit();
+ }
+
+ CountingCommunicationSpi commSpi = (CountingCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+ assertEquals(expectedNearLockRequests(), commSpi.nearLocks());
+ assertEquals(expectedDhtLockRequests(), commSpi.dhtLocks());
+ assertEquals(expectedDistributedLockRequests(), commSpi.distributedLocks());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** Counting communication SPI. */
+ protected static class CountingCommunicationSpi extends TcpCommunicationSpi {
+ /** Distributed lock requests. */
+ private AtomicInteger distLocks = new AtomicInteger();
+
+ /** Near lock requests. */
+ private AtomicInteger nearLocks = new AtomicInteger();
+
+ /** Dht locks. */
+ private AtomicInteger dhtLocks = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
+ throws IgniteSpiException {
+ countMsg((GridIoMessage)msg);
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * Unmarshals the message and increments counters.
+ *
+ * @param msg Message to check.
+ */
+ private void countMsg(GridIoMessage msg) {
+ Object origMsg = msg.message();
+
+ if (origMsg instanceof GridDistributedLockRequest) {
+ distLocks.incrementAndGet();
+
+ if (origMsg instanceof GridNearLockRequest)
+ nearLocks.incrementAndGet();
+ else if (origMsg instanceof GridDhtLockRequest)
+ dhtLocks.incrementAndGet();
+ }
+ }
+
+ /** @return Number of recorded distributed locks. */
+ public int distributedLocks() {
+ return distLocks.get();
+ }
+
+ /** @return Number of recorded distributed locks. */
+ public int nearLocks() {
+ return nearLocks.get();
+ }
+
+ /** @return Number of recorded distributed locks. */
+ public int dhtLocks() {
+ return dhtLocks.get();
+ }
+ }
+}