You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/23 21:35:53 UTC
[10/53] [abbrv] incubator-ignite git commit: GG-9141 - Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java
new file mode 100644
index 0000000..017bc16
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java
@@ -0,0 +1,129 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Tests for local transactions.
+ */
+@SuppressWarnings( {"BusyWait"})
+public abstract class IgniteTxSingleThreadedAbstractTest extends IgniteTxAbstractTest {
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticReadCommittedCommit() throws Exception {
+ checkCommit(PESSIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticRepeatableReadCommit() throws Exception {
+ checkCommit(PESSIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticSerializableCommit() throws Exception {
+ checkCommit(PESSIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticReadCommittedCommit() throws Exception {
+ checkCommit(OPTIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticRepeatableReadCommit() throws Exception {
+ checkCommit(OPTIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticSerializableCommit() throws Exception {
+ checkCommit(OPTIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticReadCommittedRollback() throws Exception {
+ checkRollback(PESSIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticRepeatableReadRollback() throws Exception {
+ checkRollback(PESSIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticSerializableRollback() throws Exception {
+ checkRollback(PESSIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticReadCommittedRollback() throws Exception {
+ checkRollback(OPTIMISTIC, READ_COMMITTED);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticRepeatableReadRollback() throws Exception {
+ checkRollback(OPTIMISTIC, REPEATABLE_READ);
+
+ finalChecks();
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticSerializableRollback() throws Exception {
+ checkRollback(OPTIMISTIC, SERIALIZABLE);
+
+ finalChecks();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
new file mode 100644
index 0000000..f79aea0
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -0,0 +1,631 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.cache.store.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ * Tests that transaction is invalidated in case of {@link GridCacheTxHeuristicException}.
+ */
+public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAbstractSelfTest {
+ /** Index SPI throwing exception. */
+ private static TestStore store = new TestStore();
+
+ /** */
+ private static final int PRIMARY = 0;
+
+ /** */
+ private static final int BACKUP = 1;
+
+ /** */
+ private static final int NOT_PRIMARY_AND_BACKUP = 2;
+
+ /** */
+ private static Integer lastKey;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getTransactionsConfiguration().setTxSerializableEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+ ccfg.setStore(store);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ lastKey = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ store.forceFail(false);
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutNear() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutPrimary() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPut(false, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackup() throws Exception {
+ checkPut(true, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkPut(false, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAll() throws Exception {
+ checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ if (gridCount() > 1) {
+ checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+
+ checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveNear() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemovePrimary() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveBackup() throws Exception {
+ checkRemove(false, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkRemove(true, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformNear() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformPrimary() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransformBackup() throws Exception {
+ checkTransform(false, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkTransform(true, keyForNode(grid(0).localNode(), BACKUP));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutNearTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutPrimaryTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutBackupTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+
+ checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutMultipleKeysTx() throws Exception {
+ for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) {
+ for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) {
+ checkPutTx(true, concurrency, isolation,
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation,
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY),
+ keyForNode(grid(0).localNode(), PRIMARY));
+
+ if (gridCount() > 1) {
+ checkPutTx(true, concurrency, isolation,
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+
+ checkPutTx(false, concurrency, isolation,
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY),
+ keyForNode(grid(1).localNode(), PRIMARY));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param keys Keys.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void checkPutTx(boolean putBefore, GridCacheTxConcurrency concurrency,
+ GridCacheTxIsolation isolation, final Integer... keys) throws Exception {
+ assertTrue(keys.length > 0);
+
+ info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+ GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+ if (putBefore) {
+ store.forceFail(false);
+
+ info("Start transaction.");
+
+ try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+ for (Integer key : keys) {
+ info("Put " + key);
+
+ cache.put(key, 1);
+ }
+
+ info("Commit.");
+
+ tx.commit();
+ }
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ grid(i).cache(null).get(key);
+ }
+
+ store.forceFail(true);
+
+ try {
+ info("Start transaction.");
+
+ try (IgniteTx tx = cache.txStart(concurrency, isolation)) {
+ for (Integer key : keys) {
+ info("Put " + key);
+
+ cache.put(key, 2);
+ }
+
+ info("Commit.");
+
+ tx.commit();
+ }
+
+ fail("Transaction should fail.");
+ }
+ catch (IgniteCheckedException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ for (Integer key : keys)
+ checkValue(key, putBefore);
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkValue(final Integer key, boolean putBefore) throws Exception {
+ store.forceFail(false);
+
+ info("Check key: " + key);
+
+ for (int i = 0; i < gridCount(); i++) {
+ GridKernal grid = (GridKernal) grid(i);
+
+ GridCacheAdapter cache = grid.internalCache(null);
+
+ GridCacheMapEntry entry = cache.map().getEntry(key);
+
+ log.info("Entry: " + entry);
+
+ if (entry != null) {
+ assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+ assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore,
+ entry.hasValue());
+ assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null,
+ entry.rawGetOrUnmarshal(false));
+ }
+
+ if (cache.isNear()) {
+ entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key);
+
+ log.info("Dht entry: " + entry);
+
+ if (entry != null) {
+ assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny());
+ assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore,
+ entry.hasValue());
+ assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null,
+ entry.rawGetOrUnmarshal(false));
+ }
+ }
+ }
+
+ for (int i = 0; i < gridCount(); i++)
+ assertEquals("Unexpected value for grid " + i, putBefore ? 1 : null, grid(i).cache(null).get(key));
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkPut(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ store.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ store.forceFail(true);
+
+ info("Going to put: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).put(key, 2);
+
+ return null;
+ }
+ }, GridCacheTxRollbackException.class, null);
+
+ checkValue(key, putBefore);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkTransform(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ store.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ store.forceFail(true);
+
+ info("Going to transform: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
+ @Override public Object apply(Object o) {
+ return 2;
+ }
+ });
+
+ return null;
+ }
+ }, GridCacheTxRollbackException.class, null);
+
+ checkValue(key, putBefore);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param keys Keys.
+ * @throws Exception If failed.
+ */
+ private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception {
+ assert keys.length > 1;
+
+ if (putBefore) {
+ store.forceFail(false);
+
+ Map<Integer, Integer> m = new HashMap<>();
+
+ for (Integer key : keys)
+ m.put(key, 1);
+
+ info("Put data: " + m);
+
+ grid(0).cache(null).putAll(m);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++) {
+ for (Integer key : keys)
+ grid(i).cache(null).get(key);
+ }
+
+ store.forceFail(true);
+
+ final Map<Integer, Integer> m = new HashMap<>();
+
+ for (Integer key : keys)
+ m.put(key, 2);
+
+ info("Going to putAll: " + m);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).putAll(m);
+
+ return null;
+ }
+ }, GridCacheTxRollbackException.class, null);
+
+ for (Integer key : m.keySet())
+ checkValue(key, putBefore);
+ }
+
+ /**
+ * @param putBefore If {@code true} then puts some value before executing failing operation.
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkRemove(boolean putBefore, final Integer key) throws Exception {
+ if (putBefore) {
+ store.forceFail(false);
+
+ info("Put key: " + key);
+
+ grid(0).cache(null).put(key, 1);
+ }
+
+ // Execute get from all nodes to create readers for near cache.
+ for (int i = 0; i < gridCount(); i++)
+ grid(i).cache(null).get(key);
+
+ store.forceFail(true);
+
+ info("Going to remove: " + key);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ grid(0).cache(null).remove(key);
+
+ return null;
+ }
+ }, GridCacheTxRollbackException.class, null);
+
+ checkValue(key, putBefore);
+ }
+
+ /**
+ * Generates key of a given type for given node.
+ *
+ * @param node Node.
+ * @param type Key type.
+ * @return Key.
+ */
+ private Integer keyForNode(ClusterNode node, int type) {
+ GridCache<Integer, Integer> cache = grid(0).cache(null);
+
+ if (cache.configuration().getCacheMode() == LOCAL)
+ return ++lastKey;
+
+ if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP)
+ return ++lastKey;
+
+ for (int key = lastKey + 1; key < (lastKey + 10_000); key++) {
+ switch (type) {
+ case NOT_PRIMARY_AND_BACKUP: {
+ if (!cache.affinity().isPrimaryOrBackup(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ case PRIMARY: {
+ if (cache.affinity().isPrimary(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ case BACKUP: {
+ if (cache.affinity().isBackup(node, key)) {
+ lastKey = key;
+
+ return key;
+ }
+
+ break;
+ }
+
+ default:
+ fail();
+ }
+ }
+
+ throw new IllegalStateException("Failed to find key.");
+ }
+
+ /**
+ *
+ */
+ private static class TestStore implements GridCacheStore<Object, Object> {
+ /** Fail flag. */
+ private volatile boolean fail;
+
+ /**
+ * @param fail Fail flag.
+ */
+ public void forceFail(boolean fail) {
+ this.fail = fail;
+ }
+
+
+ @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
+ return null;
+ }
+
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args)
+ throws IgniteCheckedException {
+ if (fail)
+ throw new IgniteCheckedException("Store exception");
+ }
+
+ @Override public void loadAll(@Nullable IgniteTx tx, Collection<?> keys, IgniteBiInClosure<Object, Object> c)
+ throws IgniteCheckedException {
+ }
+
+ @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException {
+ if (fail)
+ throw new IgniteCheckedException("Store exception");
+ }
+
+ @Override public void putAll(@Nullable IgniteTx tx, Map<?, ?> map) throws IgniteCheckedException {
+ if (fail)
+ throw new IgniteCheckedException("Store exception");
+ }
+
+ @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
+ if (fail)
+ throw new IgniteCheckedException("Store exception");
+ }
+
+ @Override public void removeAll(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException {
+ if (fail)
+ throw new IgniteCheckedException("Store exception");
+ }
+
+ @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
+ if (fail && commit)
+ throw new IgniteCheckedException("Store exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java
index 1291faf..8026483 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java
@@ -464,7 +464,7 @@ public class GridCacheAtomicLongApiSelfTest extends GridCommonAbstractTest {
assertEquals(0, cache.primarySize());
- try (GridCacheTx tx = cache.txStart()) {
+ try (IgniteTx tx = cache.txStart()) {
long newVal = RND.nextLong();
long curAtomicVal = atomic.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
index 9d165cc..db7115b 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java
@@ -220,7 +220,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends GridCommonAbs
* @throws Exception If failed.
*/
public void testGetAndAddInTx() throws Exception {
- try (GridCacheTx tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (int i = 1; i < MAX_LOOPS_NUM; i++) {
for (GridCacheAtomicSequence seq : seqArr)
getAndAdd(seq, i);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java
index ead25ef..5ed6edd 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java
@@ -147,7 +147,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac
assert PARTITIONED == grid(i).cache(null).configuration().getCacheMode();
- try (GridCacheTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
Integer val = (Integer) grid(i).cache(null).get(key);
assertEquals("Simple check failed for node: " + i, (Integer) i, val);
@@ -172,7 +172,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac
assert PARTITIONED == grid(i).cache(null).configuration().getCacheMode();
- try (GridCacheTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
GridCacheAtomicLongValue atomicVal = ((GridCacheAtomicLongValue) grid(i).cache(null).get(key));
@@ -230,7 +230,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac
// Init cache data.
- try (GridCacheTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
// Put simple value.
grid(0).cache(null).put(key, INIT_GRID_NUM);
@@ -253,7 +253,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac
// Init cache data.
- try (GridCacheTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
// Put custom data
grid(0).cache(null).put(new GridCacheInternalKeyImpl(key), new GridCacheAtomicLongValue(INIT_GRID_NUM));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
index 2cb26c6..84340a9 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java
@@ -151,7 +151,7 @@ public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends GridCommon
info("Partition: " + cache.affinity().partition(1));
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
// info("Getting value for key 1");
String s = cache.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java
index 3467e53..24a33bb 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java
@@ -328,7 +328,7 @@ public abstract class GridCacheAbstractDistributedByteArrayValuesSelfTest extend
private void testTransactionMixed0(GridCache<Integer, Object>[] caches, GridCacheTxConcurrency concurrency,
Integer key1, byte[] val1, @Nullable Integer key2, @Nullable Object val2) throws Exception {
for (GridCache<Integer, Object> cache : caches) {
- GridCacheTx tx = cache.txStart(concurrency, REPEATABLE_READ);
+ IgniteTx tx = cache.txStart(concurrency, REPEATABLE_READ);
try {
cache.put(key1, val1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
index 45e20ae..5773714 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java
@@ -125,7 +125,7 @@ public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstra
@Override public Void applyx(final Integer i) throws IgniteCheckedException {
GridCache<String, int[]> cache = this.ignite.cache(null);
- try (GridCacheTx tx = cache.txStart(concur, isolation)) {
+ try (IgniteTx tx = cache.txStart(concur, isolation)) {
int[] arr = cache.get("TestKey");
if (arr == null)
@@ -159,7 +159,7 @@ public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstra
// Do within transaction to make sure that lock is acquired
// which means that all previous transactions have committed.
- try (GridCacheTx tx = c.txStart(concur, isolation)) {
+ try (IgniteTx tx = c.txStart(concur, isolation)) {
int[] arr = c.get("TestKey");
assertNotNull(arr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 7cd73cc..372639f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -623,7 +623,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
int c = 0;
try {
- try (GridCacheTx tx = cache.txStart(txConcurrency(), REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(txConcurrency(), REPEATABLE_READ)) {
c = txCntr.incrementAndGet();
if (c % logFreq == 0)
@@ -772,7 +772,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
int c = 0;
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
c = txCntr.incrementAndGet();
if (c % logFreq == 0)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java
index 1c12f51..1c8bf17 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java
@@ -84,7 +84,7 @@ public abstract class GridCacheAbstractPrimarySyncSelfTest extends GridCommonAbs
GridCache<Integer, Integer> cache = grid(j).cache(null);
if (cache.entry(i).primary()) {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(i, i);
tx.commit();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
index 155ceee..a05eacc 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java
@@ -252,7 +252,7 @@ public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTes
ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
ignite3.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
- GridCacheTx tx = cache1.txStart(OPTIMISTIC, READ_COMMITTED, 0, 0);
+ IgniteTx tx = cache1.txStart(OPTIMISTIC, READ_COMMITTED, 0, 0);
try {
cache1.put("tx1", "val1");
@@ -317,7 +317,7 @@ public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTes
GridCache<String, String> cache2 = ignite2.cache(null);
GridCache<String, String> cache3 = ignite3.cache(null);
- GridCacheTx tx = cache1.txStart();
+ IgniteTx tx = cache1.txStart();
cache1.put("key", "val");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
index b833381..a89ced2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java
@@ -82,7 +82,7 @@ public abstract class GridCacheEntrySetAbstractSelfTest extends GridCacheAbstrac
* @throws Exception If failed.
*/
private void putAndCheckEntrySet(GridCache<Object, Object> cache) throws Exception {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
Integer total = (Integer) cache.get(TX_KEY);
if (total == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java
index 9cc8f32..6801ffc 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java
@@ -253,7 +253,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.put(key, val) == null;
@@ -285,7 +285,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.put(key, val) == null;
@@ -351,7 +351,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.putAsync(key, val).get() == null;
@@ -383,7 +383,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.putAsync(key, val).get() == null;
@@ -443,7 +443,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.putx(key, val);
@@ -473,7 +473,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert cache.putx(key, val);
@@ -533,7 +533,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
@Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException {
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
Map.Entry<String, Integer> e = iter.next();
@@ -606,7 +606,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator();
// Optimistic transaction.
- GridCacheTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
+ IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
Map.Entry<String, Integer> e = iter.next();
@@ -683,7 +683,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
Integer val = e.getValue();
// Optimistic.
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert !cache.putx(key, val, hasPeekVal);
assert cache.putx(key, val, noPeekVal);
@@ -718,7 +718,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
String key = e.getKey();
Integer val = e.getValue();
- GridCacheTx tx = cache.txStart();
+ IgniteTx tx = cache.txStart();
assert !cache.putx(key, val, hasPeekVal);
assert cache.putx(key, val, noPeekVal);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
index 1febbc4..c128bb3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java
@@ -293,7 +293,7 @@ public class GridCacheMultithreadedFailoverAbstractTest extends GridCommonAbstra
}
}
try {
- GridCacheTx tx = atomicityMode() == TRANSACTIONAL ? cache.txStart() : null;
+ IgniteTx tx = atomicityMode() == TRANSACTIONAL ? cache.txStart() : null;
try {
cache.putAll(putMap);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
index 7d6af24..4238c15 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java
@@ -157,7 +157,7 @@ public abstract class GridCacheNodeFailureAbstractTest extends GridCommonAbstrac
GridCache<Integer, String> cache = cache(idx);
- GridCacheTx tx = cache.txStart(concurrency, isolation);
+ IgniteTx tx = cache.txStart(concurrency, isolation);
try {
cache.put(KEY, VALUE);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
index bd6cf00..82c0bae 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java
@@ -144,17 +144,17 @@ public abstract class GridCachePartitionedReloadAllAbstractSelfTest extends Grid
c.apply(e.getKey(), e.getValue());
}
- @Override public String load(GridCacheTx tx, Integer key) {
+ @Override public String load(IgniteTx tx, Integer key) {
X.println("Loading on: " + caches.indexOf(g.<Integer, String>cache(null)) + " key=" + key);
return map.get(key);
}
- @Override public void put(GridCacheTx tx, Integer key, @Nullable String val) {
+ @Override public void put(IgniteTx tx, Integer key, @Nullable String val) {
fail("Should not be called within the test.");
}
- @Override public void remove(GridCacheTx tx, Integer key) {
+ @Override public void remove(IgniteTx tx, Integer key) {
fail("Should not be called within the test.");
}
};
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java
deleted file mode 100644
index b7bea07..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.testframework.junits.common.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
-import static org.gridgain.grid.cache.GridCachePreloadMode.*;
-import static org.apache.ignite.transactions.GridCacheTxConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.GridCacheTxIsolation.REPEATABLE_READ;
-import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
-
-/**
- *
- */
-public abstract class GridCacheTxConsistencyRestartAbstractSelfTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Grid count. */
- private static final int GRID_CNT = 4;
-
- /** Key range. */
- private static final int RANGE = 100_000;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- cfg.setCacheConfiguration(cacheConfiguration(gridName));
-
- return cfg;
- }
-
- /**
- * @param gridName Grid name.
- * @return Cache configuration.
- */
- public GridCacheConfiguration cacheConfiguration(String gridName) {
- GridCacheConfiguration ccfg = new GridCacheConfiguration();
-
- ccfg.setAtomicityMode(TRANSACTIONAL);
- ccfg.setCacheMode(cacheMode());
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setDistributionMode(partitionDistributionMode());
- ccfg.setPreloadMode(SYNC);
-
- if (cacheMode() == GridCacheMode.PARTITIONED)
- ccfg.setBackups(1);
-
- return ccfg;
- }
-
- /**
- * @return Cache mode.
- */
- protected abstract GridCacheMode cacheMode();
-
- /**
- * @return Partition distribution mode for PARTITIONED cache.
- */
- protected abstract GridCacheDistributionMode partitionDistributionMode();
-
- /**
- * @throws Exception If failed.
- */
- public void testTxConsistency() throws Exception {
- startGridsMultiThreaded(GRID_CNT);
-
- IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null);
-
- for (int i = 0; i < RANGE; i++) {
- ldr.addData(i, 0);
-
- if (i > 0 && i % 1000 == 0)
- info("Put keys: " + i);
- }
-
- ldr.close();
-
- final AtomicBoolean done = new AtomicBoolean(false);
-
- Thread restartThread = new Thread() {
- @Override public void run() {
- Random rnd = new Random();
-
- while (!done.get()) {
- try {
- int idx = rnd.nextInt(GRID_CNT);
-
- stopGrid(idx);
-
- startGrid(idx);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- };
-
- restartThread.start();
-
- Random rnd = new Random();
-
- // Make some iterations with 1-3 keys transactions.
- for (int i = 0; i < 50_000; i++) {
- int idx = i % GRID_CNT;
-
- if (i > 0 && i % 1000 == 0)
- info("Running iteration: " + i);
-
- try {
- GridKernal grid = (GridKernal)grid(idx);
-
- GridCache<Integer, Integer> cache = grid.cache(null);
-
- List<Integer> keys = new ArrayList<>();
-
- int keyCnt = rnd.nextInt(3);
-
- for (int k = 0; k < keyCnt; k++)
- keys.add(rnd.nextInt(RANGE));
-
- Collections.sort(keys);
-
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
- Map<Integer, Integer> map = cache.getAll(keys);
-
- for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
- assertNotNull("Null value received from cache [key=" + entry.getKey() + "]", entry.getValue());
-
- cache.put(entry.getKey(), entry.getValue() + 1);
- }
-
- tx.commit();
- }
- }
- catch (Exception e) {
- info("Failed to update keys: " + e.getMessage());
- }
- }
-
- done.set(true);
-
- restartThread.join();
-
- for (int k = 0; k < RANGE; k++) {
- Integer val = null;
-
- for (int i = 0; i < GRID_CNT; i++) {
- GridEx grid = grid(i);
-
- GridCache<Integer, Integer> cache = grid.cache(null);
-
- if (cache.affinity().isPrimaryOrBackup(grid.localNode(), k)) {
- if (val == null) {
- val = cache.peek(k);
-
- assertNotNull("Failed to peek value for key: " + k, val);
- }
- else
- assertEquals("Failed to find value in cache [primary=" +
- cache.affinity().isPrimary(grid.localNode(), k) + ']',
- val, cache.peek(k));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java
deleted file mode 100644
index 4b15e83..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.managers.communication.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.gridgain.grid.util.direct.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.testframework.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
-
-/**
- * Abstract test for originating node failure.
- */
-public abstract class GridCacheTxOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest {
- /** */
- protected static final int GRID_CNT = 5;
-
- /** Ignore node ID. */
- private volatile UUID ignoreMsgNodeId;
-
- /** Ignore message class. */
- private Class<?> ignoreMsgCls;
-
- /**
- * @throws Exception If failed.
- */
- public void testManyKeysCommit() throws Exception {
- Collection<Integer> keys = new ArrayList<>(200);
-
- for (int i = 0; i < 200; i++)
- keys.add(i);
-
- testTxOriginatingNodeFails(keys, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testManyKeysRollback() throws Exception {
- Collection<Integer> keys = new ArrayList<>(200);
-
- for (int i = 0; i < 200; i++)
- keys.add(i);
-
- testTxOriginatingNodeFails(keys, true);
- }
-
- /**
- * @return Index of node starting transaction.
- */
- protected int originatingNode() {
- return 0;
- }
-
- /**
- * Ignores messages to given node of given type.
- *
- * @param dstNodeId Destination node ID.
- * @param msgCls Message type.
- */
- protected void ignoreMessages(UUID dstNodeId, Class<?> msgCls) {
- ignoreMsgNodeId = dstNodeId;
- ignoreMsgCls = msgCls;
- }
-
- /**
- * Gets ignore message class to simulate partial prepare message.
- *
- * @return Ignore message class.
- */
- protected abstract Class<?> ignoreMessageClass();
-
- /**
- * @param keys Keys to update.
- * @param partial Flag indicating whether to simulate partial prepared state.
- * @throws Exception If failed.
- */
- protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean partial) throws Exception {
- assertFalse(keys.isEmpty());
-
- final Collection<GridKernal> grids = new ArrayList<>();
-
- ClusterNode txNode = grid(originatingNode()).localNode();
-
- for (int i = 1; i < gridCount(); i++)
- grids.add((GridKernal)grid(i));
-
- final Map<Integer, String> map = new HashMap<>();
-
- final String initVal = "initialValue";
-
- for (Integer key : keys) {
- grid(originatingNode()).cache(null).put(key, initVal);
-
- map.put(key, String.valueOf(key));
- }
-
- Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
-
- GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache();
-
- info("Node being checked: " + grid(1).localNode().id());
-
- for (Integer key : keys) {
- Collection<ClusterNode> nodes = new ArrayList<>();
-
- nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
-
- nodes.remove(txNode);
-
- nodeMap.put(key, nodes);
- }
-
- info("Starting tx [values=" + map + ", topVer=" +
- ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
-
- if (partial)
- ignoreMessages(grid(1).localNode().id(), ignoreMessageClass());
-
- final Ignite txIgniteNode = G.ignite(txNode.id());
-
- GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- GridCache<Integer, String> cache = txIgniteNode.cache(null);
-
- assertNotNull(cache);
-
- GridCacheTxProxyImpl tx = (GridCacheTxProxyImpl)cache.txStart();
-
- GridCacheTxEx txEx = GridTestUtils.getFieldValue(tx, "tx");
-
- cache.putAll(map);
-
- try {
- txEx.prepareAsync().get(3, TimeUnit.SECONDS);
- }
- catch (IgniteFutureTimeoutException ignored) {
- info("Failed to wait for prepare future completion: " + partial);
- }
-
- return null;
- }
- }).get();
-
- info("Stopping originating node " + txNode);
-
- G.stop(G.ignite(txNode.id()).name(), true);
-
- info("Stopped grid, waiting for transactions to complete.");
-
- boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- for (GridKernal g : grids) {
- GridCacheSharedContext<Object, Object> ctx = g.context().cache().context();
-
- int txNum = ctx.tm().idMapSize();
-
- if (txNum != 0)
- return false;
- }
-
- return true;
- }
- }, 10000);
-
- assertTrue(txFinished);
-
- info("Transactions finished.");
-
- for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
- final Integer key = e.getKey();
-
- final String val = map.get(key);
-
- assertFalse(e.getValue().isEmpty());
-
- for (ClusterNode node : e.getValue()) {
- compute(G.ignite(node.id()).cluster().forNode(node)).call(new Callable<Void>() {
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- @Override public Void call() throws Exception {
- GridCache<Integer, String> cache = ignite.cache(null);
-
- assertNotNull(cache);
-
- assertEquals(partial ? initVal : val, cache.peek(key));
-
- return null;
- }
- });
- }
- }
-
- for (Map.Entry<Integer, String> e : map.entrySet()) {
- for (Ignite g : G.allGrids()) {
- UUID locNodeId = g.cluster().localNode().id();
-
- assertEquals("Check failed for node: " + locNodeId, partial ? initVal : e.getValue(),
- g.cache(null).get(e.getKey()));
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setCommunicationSpi(new TcpCommunicationSpi() {
- @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
- throws IgniteSpiException {
- if (!F.eq(ignoreMsgNodeId, node.id()) || !ignoredMessage((GridIoMessage)msg))
- super.sendMessage(node, msg);
- }
- });
-
- cfg.getTransactionsConfiguration().setDefaultTxConcurrency(OPTIMISTIC);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
- GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
-
- cfg.setStore(null);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return GRID_CNT;
- }
-
- /** {@inheritDoc} */
- @Override protected abstract GridCacheMode cacheMode();
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGridsMultiThreaded(GRID_CNT);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- // No-op
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- ignoreMsgCls = null;
- ignoreMsgNodeId = null;
- }
-
- /**
- * Checks if message should be ignored.
- *
- * @param msg Message.
- * @return {@code True} if message should be ignored.
- */
- private boolean ignoredMessage(GridIoMessage msg) {
- return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
deleted file mode 100644
index 4d257f6..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.gridgain.grid.kernal.managers.communication.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.gridgain.grid.util.direct.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.testframework.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
-
-/**
- * Abstract test for originating node failure.
- */
-public abstract class GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest {
- /** */
- protected static final int GRID_CNT = 5;
-
- /** Ignore node ID. */
- private volatile Collection<UUID> ignoreMsgNodeIds;
-
- /** Ignore message class. */
- private Collection<Class<?>> ignoreMsgCls;
-
- /** Failing node ID. */
- private UUID failingNodeId;
-
- /**
- * @throws Exception If failed.
- */
- public void testManyKeysCommit() throws Exception {
- Collection<Integer> keys = new ArrayList<>(200);
-
- for (int i = 0; i < 200; i++)
- keys.add(i);
-
- testTxOriginatingNodeFails(keys, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testManyKeysRollback() throws Exception {
- Collection<Integer> keys = new ArrayList<>(200);
-
- for (int i = 0; i < 200; i++)
- keys.add(i);
-
- testTxOriginatingNodeFails(keys, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPrimaryNodeFailureCommit() throws Exception {
- checkPrimaryNodeCrash(true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPrimaryNodeFailureRollback() throws Exception {
- checkPrimaryNodeCrash(false);
- }
-
- /**
- * @return Index of node starting transaction.
- */
- protected int originatingNode() {
- return 0;
- }
-
- /**
- * Ignores messages to given node of given type.
- *
- * @param dstNodeIds Destination node IDs.
- * @param msgCls Message type.
- */
- protected void ignoreMessages(Collection<Class<?>> msgCls, Collection<UUID> dstNodeIds) {
- ignoreMsgNodeIds = dstNodeIds;
- ignoreMsgCls = msgCls;
- }
-
- /**
- * Gets ignore message class to simulate partial prepare message.
- *
- * @return Ignore message class.
- */
- protected abstract Collection<Class<?>> ignoreMessageClasses();
-
- /**
- * @param keys Keys to update.
- * @param fullFailure Flag indicating whether to simulate rollback state.
- * @throws Exception If failed.
- */
- protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean fullFailure) throws Exception {
- assertFalse(keys.isEmpty());
-
- final Collection<GridKernal> grids = new ArrayList<>();
-
- ClusterNode txNode = grid(originatingNode()).localNode();
-
- for (int i = 1; i < gridCount(); i++)
- grids.add((GridKernal)grid(i));
-
- failingNodeId = grid(0).localNode().id();
-
- final Map<Integer, String> map = new HashMap<>();
-
- final String initVal = "initialValue";
-
- for (Integer key : keys) {
- grid(originatingNode()).cache(null).put(key, initVal);
-
- map.put(key, String.valueOf(key));
- }
-
- Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
-
- GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache();
-
- info("Node being checked: " + grid(1).localNode().id());
-
- for (Integer key : keys) {
- Collection<ClusterNode> nodes = new ArrayList<>();
-
- nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
-
- nodes.remove(txNode);
-
- nodeMap.put(key, nodes);
- }
-
- info("Starting tx [values=" + map + ", topVer=" +
- ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
-
- if (fullFailure)
- ignoreMessages(ignoreMessageClasses(), allNodeIds());
- else
- ignoreMessages(ignoreMessageClasses(), F.asList(grid(1).localNode().id()));
-
- final GridEx originatingNodeGrid = grid(originatingNode());
-
- GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- GridCache<Integer, String> cache = originatingNodeGrid.cache(null);
-
- assertNotNull(cache);
-
- GridCacheTx tx = cache.txStart();
-
- try {
- cache.putAll(map);
-
- info("Before commitAsync");
-
- IgniteFuture<GridCacheTx> fut = tx.commitAsync();
-
- info("Got future for commitAsync().");
-
- fut.get(3, TimeUnit.SECONDS);
- }
- catch (IgniteFutureTimeoutException ignored) {
- info("Failed to wait for commit future completion [fullFailure=" + fullFailure + ']');
- }
-
- return null;
- }
- }).get();
-
- info(">>> Stopping originating node " + txNode);
-
- G.stop(grid(originatingNode()).name(), true);
-
- ignoreMessages(Collections.<Class<?>>emptyList(), Collections.<UUID>emptyList());
-
- info(">>> Stopped originating node: " + txNode.id());
-
- boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- for (GridKernal g : grids) {
- GridCacheAdapter<?, ?> cache = g.internalCache();
-
- GridCacheTxManager txMgr = cache.isNear() ?
- ((GridNearCacheAdapter)cache).dht().context().tm() :
- cache.context().tm();
-
- int txNum = txMgr.idMapSize();
-
- if (txNum != 0)
- return false;
- }
-
- return true;
- }
- }, 10000);
-
- assertTrue(txFinished);
-
- info("Transactions finished.");
-
- for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
- final Integer key = e.getKey();
-
- final String val = map.get(key);
-
- assertFalse(e.getValue().isEmpty());
-
- for (ClusterNode node : e.getValue()) {
- final UUID checkNodeId = node.id();
-
- compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() {
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- @Override public Void call() throws Exception {
- GridCache<Integer, String> cache = ignite.cache(null);
-
- assertNotNull(cache);
-
- assertEquals("Failed to check entry value on node: " + checkNodeId,
- fullFailure ? initVal : val, cache.peek(key));
-
- return null;
- }
- });
- }
- }
-
- for (Map.Entry<Integer, String> e : map.entrySet()) {
- for (Ignite g : G.allGrids())
- assertEquals(fullFailure ? initVal : e.getValue(), g.cache(null).get(e.getKey()));
- }
- }
-
- /**
- * Checks tx data consistency in case when primary node crashes.
- *
- * @param commmit Whether to commit or rollback a transaction.
- * @throws Exception If failed.
- */
- private void checkPrimaryNodeCrash(final boolean commmit) throws Exception {
- Collection<Integer> keys = new ArrayList<>(20);
-
- for (int i = 0; i < 20; i++)
- keys.add(i);
-
- final Collection<GridKernal> grids = new ArrayList<>();
-
- ClusterNode primaryNode = grid(1).localNode();
-
- for (int i = 0; i < gridCount(); i++) {
- if (i != 1)
- grids.add((GridKernal)grid(i));
- }
-
- failingNodeId = primaryNode.id();
-
- final Map<Integer, String> map = new HashMap<>();
-
- final String initVal = "initialValue";
-
- for (Integer key : keys) {
- grid(originatingNode()).cache(null).put(key, initVal);
-
- map.put(key, String.valueOf(key));
- }
-
- Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
-
- GridCache<Integer, String> cache = grid(0).cache(null);
-
- info("Failing node ID: " + grid(1).localNode().id());
-
- for (Integer key : keys) {
- Collection<ClusterNode> nodes = new ArrayList<>();
-
- nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
-
- nodes.remove(primaryNode);
-
- nodeMap.put(key, nodes);
- }
-
- info("Starting tx [values=" + map + ", topVer=" +
- ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
-
- assertNotNull(cache);
-
- try (GridCacheTx tx = cache.txStart()) {
- cache.getAll(keys);
-
- // Should not send any messages.
- cache.putAll(map);
-
- // Fail the node in the middle of transaction.
- info(">>> Stopping primary node " + primaryNode);
-
- G.stop(G.ignite(primaryNode.id()).name(), true);
-
- info(">>> Stopped originating node, finishing transaction: " + primaryNode.id());
-
- if (commmit)
- tx.commit();
- else
- tx.rollback();
- }
-
- boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- for (GridKernal g : grids) {
- GridCacheAdapter<?, ?> cache = g.internalCache();
-
- GridCacheTxManager txMgr = cache.isNear() ?
- ((GridNearCacheAdapter)cache).dht().context().tm() :
- cache.context().tm();
-
- int txNum = txMgr.idMapSize();
-
- if (txNum != 0)
- return false;
- }
-
- return true;
- }
- }, 10000);
-
- assertTrue(txFinished);
-
- info("Transactions finished.");
-
- for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
- final Integer key = e.getKey();
-
- final String val = map.get(key);
-
- assertFalse(e.getValue().isEmpty());
-
- for (ClusterNode node : e.getValue()) {
- final UUID checkNodeId = node.id();
-
- compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() {
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- @Override public Void call() throws Exception {
- GridCache<Integer, String> cache = ignite.cache(null);
-
- assertNotNull(cache);
-
- assertEquals("Failed to check entry value on node: " + checkNodeId,
- !commmit ? initVal : val, cache.peek(key));
-
- return null;
- }
- });
- }
- }
-
- for (Map.Entry<Integer, String> e : map.entrySet()) {
- for (Ignite g : G.allGrids())
- assertEquals(!commmit ? initVal : e.getValue(), g.cache(null).get(e.getKey()));
- }
- }
-
- /**
- * @return All node IDs.
- */
- private Collection<UUID> allNodeIds() {
- Collection<UUID> nodeIds = new ArrayList<>(gridCount());
-
- for (int i = 0; i < gridCount(); i++)
- nodeIds.add(grid(i).localNode().id());
-
- return nodeIds;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setCommunicationSpi(new TcpCommunicationSpi() {
- @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
- throws IgniteSpiException {
- if (getSpiContext().localNode().id().equals(failingNodeId)) {
- if (ignoredMessage((GridIoMessage)msg) && ignoreMsgNodeIds != null) {
- for (UUID ignored : ignoreMsgNodeIds) {
- if (node.id().equals(ignored))
- return;
- }
- }
- }
-
- super.sendMessage(node, msg);
- }
- });
-
- cfg.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
- GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
-
- cfg.setStore(null);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return GRID_CNT;
- }
-
- /** {@inheritDoc} */
- @Override protected abstract GridCacheMode cacheMode();
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGridsMultiThreaded(GRID_CNT);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- // No-op
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- ignoreMsgCls = null;
- ignoreMsgNodeIds = null;
- }
-
- /**
- * Checks if message should be ignored.
- *
- * @param msg Message.
- * @return {@code True} if message should be ignored.
- */
- private boolean ignoredMessage(GridIoMessage msg) {
- Collection<Class<?>> ignoreClss = ignoreMsgCls;
-
- if (ignoreClss != null) {
- for (Class<?> ignoreCls : ignoreClss) {
- if (ignoreCls.isAssignableFrom(msg.message().getClass()))
- return true;
- }
-
- return false;
- }
- else
- return false;
- }
-}