You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/22 00:04:07 UTC
[14/46] incubator-ignite git commit: GG-9141 - Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPreloadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPreloadAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPreloadAbstractTest.java
deleted file mode 100644
index 08c4821..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPreloadAbstractTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.testframework.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
-
-/**
- * Tests transaction during cache preloading.
- */
-public abstract class GridCacheTxPreloadAbstractTest extends GridCacheAbstractSelfTest {
- /** */
- private static final int GRID_CNT = 6;
-
- /** */
- private static volatile boolean keyNotLoaded;
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- keyNotLoaded = false;
-
- startGrid(0);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return GRID_CNT;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRemoteTxPreloading() throws Exception {
- GridCache<String, Integer> cache = cache(0);
-
- for (int i = 0; i < 10000; i++)
- cache.put(String.valueOf(i), 0);
-
- final AtomicInteger gridIdx = new AtomicInteger(1);
-
- IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
- new Callable<Object>() {
- @Nullable @Override public Object call() throws Exception {
- int idx = gridIdx.getAndIncrement();
-
- startGrid(idx);
-
- return null;
- }
- },
- GRID_CNT - 1,
- "grid-starter-" + getName()
- );
-
- waitForRemoteNodes(grid(0), 2);
-
- Set<String> keys = new HashSet<>();
-
- for (int i = 0; i < 10; i++)
- keys.add(String.valueOf(i * 1000));
-
- cache.transformAll(keys, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
- if (val == null)
- keyNotLoaded = true;
-
- return val + 1;
- }
- });
-
- assertFalse(keyNotLoaded);
-
- fut.get();
-
- for (int i = 0; i < GRID_CNT; i++)
- // Wait for preloader.
- cache(i).forceRepartition().get();
-
- for (int i = 0; i < GRID_CNT; i++) {
- for (String key : keys)
- assertEquals("Unexpected value for cache " + i, (Integer)1, cache(i).get(key));
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLocalTxPreloadingOptimistic() throws Exception {
- testLocalTxPreloading(OPTIMISTIC);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLocalTxPreloadingPessimistic() throws Exception {
- testLocalTxPreloading(PESSIMISTIC);
- }
-
- /**
- * Tries to execute transaction doing transform when target key is not yet preloaded.
- *
- * @param txConcurrency Transaction concurrency;
- * @throws Exception If failed.
- */
- private void testLocalTxPreloading(GridCacheTxConcurrency txConcurrency) throws Exception {
- Map<String, Integer> map = new HashMap<>();
-
- for (int i = 0; i < 10000; i++)
- map.put(String.valueOf(i), 0);
-
- GridCache<String, Integer> cache0 = cache(0);
-
- cache0.putAll(map);
-
- final String TX_KEY = "9000";
-
- int expVal = 0;
-
- for (int i = 1; i < GRID_CNT; i++) {
- assertEquals((Integer)expVal, cache0.get(TX_KEY));
-
- startGrid(i);
-
- GridCache<String, Integer> cache = cache(i);
-
- try (GridCacheTx tx = cache.txStart(txConcurrency, GridCacheTxIsolation.READ_COMMITTED)) {
- cache.transform(TX_KEY, new C1<Integer, Integer>() {
- @Override public Integer apply(Integer val) {
- if (val == null) {
- keyNotLoaded = true;
-
- return 1;
- }
-
- return val + 1;
- }
- });
-
- tx.commit();
- }
-
- assertFalse(keyNotLoaded);
-
- expVal++;
-
- assertEquals((Integer)expVal, cache.get(TX_KEY));
- }
-
- for (int i = 0; i < GRID_CNT; i++)
- assertEquals("Unexpected value for cache " + i, (Integer)expVal, cache(i).get(TX_KEY));
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
- GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
-
- cfg.setPreloadMode(GridCachePreloadMode.ASYNC);
-
- cfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
-
- cfg.setStore(null);
-
- return cfg;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxTimeoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxTimeoutAbstractTest.java
deleted file mode 100644
index 14d4f0b..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxTimeoutAbstractTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.testframework.junits.common.*;
-
-import java.util.*;
-
-import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
-import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
-
-/**
- * Simple cache test.
- */
-public class GridCacheTxTimeoutAbstractTest extends GridCommonAbstractTest {
- /** Random number generator. */
- private static final Random RAND = new Random();
-
- /** Grid count. */
- private static final int GRID_COUNT = 2;
-
- /** Grid instances. */
- private static final List<Ignite> IGNITEs = new ArrayList<>();
-
- /** Transaction timeout. */
- private static final long TIMEOUT = 50;
-
- /**
- * @throws Exception If failed.
- */
- @Override protected void beforeTestsStarted() throws Exception {
- for (int i = 0; i < GRID_COUNT; i++)
- IGNITEs.add(startGrid(i));
- }
-
- /**
- * @throws Exception If failed.
- */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
-
- IGNITEs.clear();
- }
-
- /**
- * @param i Grid index.
- * @return Cache.
- */
- @Override protected <K, V> GridCache<K, V> cache(int i) {
- return IGNITEs.get(i).cache(null);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testPessimisticReadCommitted() throws Exception {
- checkTransactionTimeout(PESSIMISTIC, READ_COMMITTED);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testPessimisticRepeatableRead() throws Exception {
- checkTransactionTimeout(PESSIMISTIC, REPEATABLE_READ);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testPessimisticSerializable() throws Exception {
- checkTransactionTimeout(PESSIMISTIC, SERIALIZABLE);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testOptimisticReadCommitted() throws Exception {
- checkTransactionTimeout(OPTIMISTIC, READ_COMMITTED);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testOptimisticRepeatableRead() throws Exception {
- checkTransactionTimeout(OPTIMISTIC, REPEATABLE_READ);
- }
-
- /**
- * @throws IgniteCheckedException If test failed.
- */
- public void testOptimisticSerializable() throws Exception {
- checkTransactionTimeout(OPTIMISTIC, SERIALIZABLE);
- }
-
- /**
- * @param concurrency Concurrency.
- * @param isolation Isolation.
- * @throws IgniteCheckedException If test failed.
- */
- private void checkTransactionTimeout(GridCacheTxConcurrency concurrency,
- GridCacheTxIsolation isolation) throws Exception {
-
- int idx = RAND.nextInt(GRID_COUNT);
-
- GridCache<Integer, String> cache = cache(idx);
-
- GridCacheTx tx = cache.txStart(concurrency, isolation, TIMEOUT, 0);
-
- try {
- info("Storing value in cache [key=1, val=1]");
-
- cache.put(1, "1");
-
- long sleep = TIMEOUT * 2;
-
- info("Going to sleep for (ms): " + sleep);
-
- Thread.sleep(sleep);
-
- info("Storing value in cache [key=1, val=2]");
-
- cache.put(1, "2");
-
- info("Committing transaction: " + tx);
-
- tx.commit();
-
- assert false : "Timeout never happened for transaction: " + tx;
- }
- catch (GridCacheTxTimeoutException e) {
- info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']');
- }
- finally {
- tx.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
new file mode 100644
index 0000000..f2b1640
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
@@ -0,0 +1,194 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.REPEATABLE_READ;
+import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public abstract class IgniteTxConsistencyRestartAbstractSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Grid count. */
+ private static final int GRID_CNT = 4;
+
+ /** Key range. */
+ private static final int RANGE = 100_000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+ return cfg;
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ public GridCacheConfiguration cacheConfiguration(String gridName) {
+ GridCacheConfiguration ccfg = new GridCacheConfiguration();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setCacheMode(cacheMode());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setDistributionMode(partitionDistributionMode());
+ ccfg.setPreloadMode(SYNC);
+
+ if (cacheMode() == GridCacheMode.PARTITIONED)
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract GridCacheMode cacheMode();
+
+ /**
+ * @return Partition distribution mode for PARTITIONED cache.
+ */
+ protected abstract GridCacheDistributionMode partitionDistributionMode();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxConsistency() throws Exception {
+ startGridsMultiThreaded(GRID_CNT);
+
+ IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null);
+
+ for (int i = 0; i < RANGE; i++) {
+ ldr.addData(i, 0);
+
+ if (i > 0 && i % 1000 == 0)
+ info("Put keys: " + i);
+ }
+
+ ldr.close();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+
+ Thread restartThread = new Thread() {
+ @Override public void run() {
+ Random rnd = new Random();
+
+ while (!done.get()) {
+ try {
+ int idx = rnd.nextInt(GRID_CNT);
+
+ stopGrid(idx);
+
+ startGrid(idx);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ restartThread.start();
+
+ Random rnd = new Random();
+
+ // Make some iterations with 1-3 keys transactions.
+ for (int i = 0; i < 50_000; i++) {
+ int idx = i % GRID_CNT;
+
+ if (i > 0 && i % 1000 == 0)
+ info("Running iteration: " + i);
+
+ try {
+ GridKernal grid = (GridKernal)grid(idx);
+
+ GridCache<Integer, Integer> cache = grid.cache(null);
+
+ List<Integer> keys = new ArrayList<>();
+
+ int keyCnt = rnd.nextInt(3);
+
+ for (int k = 0; k < keyCnt; k++)
+ keys.add(rnd.nextInt(RANGE));
+
+ Collections.sort(keys);
+
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Map<Integer, Integer> map = cache.getAll(keys);
+
+ for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
+ assertNotNull("Null value received from cache [key=" + entry.getKey() + "]", entry.getValue());
+
+ cache.put(entry.getKey(), entry.getValue() + 1);
+ }
+
+ tx.commit();
+ }
+ }
+ catch (Exception e) {
+ info("Failed to update keys: " + e.getMessage());
+ }
+ }
+
+ done.set(true);
+
+ restartThread.join();
+
+ for (int k = 0; k < RANGE; k++) {
+ Integer val = null;
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ GridEx grid = grid(i);
+
+ GridCache<Integer, Integer> cache = grid.cache(null);
+
+ if (cache.affinity().isPrimaryOrBackup(grid.localNode(), k)) {
+ if (val == null) {
+ val = cache.peek(k);
+
+ assertNotNull("Failed to peek value for key: " + k, val);
+ }
+ else
+ assertEquals("Failed to find value in cache [primary=" +
+ cache.affinity().isPrimary(grid.localNode(), k) + ']',
+ val, cache.peek(k));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
new file mode 100644
index 0000000..9f08861
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -0,0 +1,294 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+
+/**
+ * Abstract test for originating node failure.
+ */
+public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest {
+ /** */
+ protected static final int GRID_CNT = 5;
+
+ /** Ignore node ID. */
+ private volatile UUID ignoreMsgNodeId;
+
+ /** Ignore message class. */
+ private Class<?> ignoreMsgCls;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyKeysCommit() throws Exception {
+ Collection<Integer> keys = new ArrayList<>(200);
+
+ for (int i = 0; i < 200; i++)
+ keys.add(i);
+
+ testTxOriginatingNodeFails(keys, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyKeysRollback() throws Exception {
+ Collection<Integer> keys = new ArrayList<>(200);
+
+ for (int i = 0; i < 200; i++)
+ keys.add(i);
+
+ testTxOriginatingNodeFails(keys, true);
+ }
+
+ /**
+ * @return Index of node starting transaction.
+ */
+ protected int originatingNode() {
+ return 0;
+ }
+
+ /**
+ * Ignores messages to given node of given type.
+ *
+ * @param dstNodeId Destination node ID.
+ * @param msgCls Message type.
+ */
+ protected void ignoreMessages(UUID dstNodeId, Class<?> msgCls) {
+ ignoreMsgNodeId = dstNodeId;
+ ignoreMsgCls = msgCls;
+ }
+
+ /**
+ * Gets ignore message class to simulate partial prepare message.
+ *
+ * @return Ignore message class.
+ */
+ protected abstract Class<?> ignoreMessageClass();
+
+ /**
+ * @param keys Keys to update.
+ * @param partial Flag indicating whether to simulate partial prepared state.
+ * @throws Exception If failed.
+ */
+ protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean partial) throws Exception {
+ assertFalse(keys.isEmpty());
+
+ final Collection<GridKernal> grids = new ArrayList<>();
+
+ ClusterNode txNode = grid(originatingNode()).localNode();
+
+ for (int i = 1; i < gridCount(); i++)
+ grids.add((GridKernal)grid(i));
+
+ final Map<Integer, String> map = new HashMap<>();
+
+ final String initVal = "initialValue";
+
+ for (Integer key : keys) {
+ grid(originatingNode()).cache(null).put(key, initVal);
+
+ map.put(key, String.valueOf(key));
+ }
+
+ Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
+
+ GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache();
+
+ info("Node being checked: " + grid(1).localNode().id());
+
+ for (Integer key : keys) {
+ Collection<ClusterNode> nodes = new ArrayList<>();
+
+ nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
+
+ nodes.remove(txNode);
+
+ nodeMap.put(key, nodes);
+ }
+
+ info("Starting tx [values=" + map + ", topVer=" +
+ ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
+
+ if (partial)
+ ignoreMessages(grid(1).localNode().id(), ignoreMessageClass());
+
+ final Ignite txIgniteNode = G.ignite(txNode.id());
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ GridCache<Integer, String> cache = txIgniteNode.cache(null);
+
+ assertNotNull(cache);
+
+ IgniteTxProxyImpl tx = (IgniteTxProxyImpl)cache.txStart();
+
+ GridCacheTxEx txEx = GridTestUtils.getFieldValue(tx, "tx");
+
+ cache.putAll(map);
+
+ try {
+ txEx.prepareAsync().get(3, TimeUnit.SECONDS);
+ }
+ catch (IgniteFutureTimeoutException ignored) {
+ info("Failed to wait for prepare future completion: " + partial);
+ }
+
+ return null;
+ }
+ }).get();
+
+ info("Stopping originating node " + txNode);
+
+ G.stop(G.ignite(txNode.id()).name(), true);
+
+ info("Stopped grid, waiting for transactions to complete.");
+
+ boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (GridKernal g : grids) {
+ GridCacheSharedContext<Object, Object> ctx = g.context().cache().context();
+
+ int txNum = ctx.tm().idMapSize();
+
+ if (txNum != 0)
+ return false;
+ }
+
+ return true;
+ }
+ }, 10000);
+
+ assertTrue(txFinished);
+
+ info("Transactions finished.");
+
+ for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
+ final Integer key = e.getKey();
+
+ final String val = map.get(key);
+
+ assertFalse(e.getValue().isEmpty());
+
+ for (ClusterNode node : e.getValue()) {
+ compute(G.ignite(node.id()).cluster().forNode(node)).call(new Callable<Void>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public Void call() throws Exception {
+ GridCache<Integer, String> cache = ignite.cache(null);
+
+ assertNotNull(cache);
+
+ assertEquals(partial ? initVal : val, cache.peek(key));
+
+ return null;
+ }
+ });
+ }
+ }
+
+ for (Map.Entry<Integer, String> e : map.entrySet()) {
+ for (Ignite g : G.allGrids()) {
+ UUID locNodeId = g.cluster().localNode().id();
+
+ assertEquals("Check failed for node: " + locNodeId, partial ? initVal : e.getValue(),
+ g.cache(null).get(e.getKey()));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
+ throws IgniteSpiException {
+ if (!F.eq(ignoreMsgNodeId, node.id()) || !ignoredMessage((GridIoMessage)msg))
+ super.sendMessage(node, msg);
+ }
+ });
+
+ cfg.getTransactionsConfiguration().setDefaultTxConcurrency(OPTIMISTIC);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setStore(null);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_CNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected abstract GridCacheMode cacheMode();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGridsMultiThreaded(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ ignoreMsgCls = null;
+ ignoreMsgNodeId = null;
+ }
+
+ /**
+ * Checks if message should be ignored.
+ *
+ * @param msg Message.
+ * @return {@code True} if message should be ignored.
+ */
+ private boolean ignoredMessage(GridIoMessage msg) {
+ return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
new file mode 100644
index 0000000..0d28928
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -0,0 +1,488 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+
+/**
+ * Abstract test for originating node failure.
+ */
+public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest {
+ /** */
+ protected static final int GRID_CNT = 5;
+
+ /** Ignore node ID. */
+ private volatile Collection<UUID> ignoreMsgNodeIds;
+
+ /** Ignore message class. */
+ private Collection<Class<?>> ignoreMsgCls;
+
+ /** Failing node ID. */
+ private UUID failingNodeId;
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyKeysCommit() throws Exception {
+ Collection<Integer> keys = new ArrayList<>(200);
+
+ for (int i = 0; i < 200; i++)
+ keys.add(i);
+
+ testTxOriginatingNodeFails(keys, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testManyKeysRollback() throws Exception {
+ Collection<Integer> keys = new ArrayList<>(200);
+
+ for (int i = 0; i < 200; i++)
+ keys.add(i);
+
+ testTxOriginatingNodeFails(keys, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureCommit() throws Exception {
+ checkPrimaryNodeCrash(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPrimaryNodeFailureRollback() throws Exception {
+ checkPrimaryNodeCrash(false);
+ }
+
+ /**
+ * @return Index of node starting transaction.
+ */
+ protected int originatingNode() {
+ return 0;
+ }
+
+ /**
+ * Ignores messages to given node of given type.
+ *
+ * @param dstNodeIds Destination node IDs.
+ * @param msgCls Message type.
+ */
+ protected void ignoreMessages(Collection<Class<?>> msgCls, Collection<UUID> dstNodeIds) {
+ ignoreMsgNodeIds = dstNodeIds;
+ ignoreMsgCls = msgCls;
+ }
+
+ /**
+ * Gets ignore message class to simulate partial prepare message.
+ *
+ * @return Ignore message class.
+ */
+ protected abstract Collection<Class<?>> ignoreMessageClasses();
+
+ /**
+ * @param keys Keys to update.
+ * @param fullFailure Flag indicating whether to simulate rollback state.
+ * @throws Exception If failed.
+ */
+ protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean fullFailure) throws Exception {
+ assertFalse(keys.isEmpty());
+
+ final Collection<GridKernal> grids = new ArrayList<>();
+
+ ClusterNode txNode = grid(originatingNode()).localNode();
+
+ for (int i = 1; i < gridCount(); i++)
+ grids.add((GridKernal)grid(i));
+
+ failingNodeId = grid(0).localNode().id();
+
+ final Map<Integer, String> map = new HashMap<>();
+
+ final String initVal = "initialValue";
+
+ for (Integer key : keys) {
+ grid(originatingNode()).cache(null).put(key, initVal);
+
+ map.put(key, String.valueOf(key));
+ }
+
+ Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
+
+ GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache();
+
+ info("Node being checked: " + grid(1).localNode().id());
+
+ for (Integer key : keys) {
+ Collection<ClusterNode> nodes = new ArrayList<>();
+
+ nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
+
+ nodes.remove(txNode);
+
+ nodeMap.put(key, nodes);
+ }
+
+ info("Starting tx [values=" + map + ", topVer=" +
+ ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
+
+ if (fullFailure)
+ ignoreMessages(ignoreMessageClasses(), allNodeIds());
+ else
+ ignoreMessages(ignoreMessageClasses(), F.asList(grid(1).localNode().id()));
+
+ final GridEx originatingNodeGrid = grid(originatingNode());
+
+ GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ GridCache<Integer, String> cache = originatingNodeGrid.cache(null);
+
+ assertNotNull(cache);
+
+ IgniteTx tx = cache.txStart();
+
+ try {
+ cache.putAll(map);
+
+ info("Before commitAsync");
+
+ IgniteFuture<IgniteTx> fut = tx.commitAsync();
+
+ info("Got future for commitAsync().");
+
+ fut.get(3, TimeUnit.SECONDS);
+ }
+ catch (IgniteFutureTimeoutException ignored) {
+ info("Failed to wait for commit future completion [fullFailure=" + fullFailure + ']');
+ }
+
+ return null;
+ }
+ }).get();
+
+ info(">>> Stopping originating node " + txNode);
+
+ G.stop(grid(originatingNode()).name(), true);
+
+ ignoreMessages(Collections.<Class<?>>emptyList(), Collections.<UUID>emptyList());
+
+ info(">>> Stopped originating node: " + txNode.id());
+
+ boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (GridKernal g : grids) {
+ GridCacheAdapter<?, ?> cache = g.internalCache();
+
+ GridCacheTxManager txMgr = cache.isNear() ?
+ ((GridNearCacheAdapter)cache).dht().context().tm() :
+ cache.context().tm();
+
+ int txNum = txMgr.idMapSize();
+
+ if (txNum != 0)
+ return false;
+ }
+
+ return true;
+ }
+ }, 10000);
+
+ assertTrue(txFinished);
+
+ info("Transactions finished.");
+
+ for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
+ final Integer key = e.getKey();
+
+ final String val = map.get(key);
+
+ assertFalse(e.getValue().isEmpty());
+
+ for (ClusterNode node : e.getValue()) {
+ final UUID checkNodeId = node.id();
+
+ compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public Void call() throws Exception {
+ GridCache<Integer, String> cache = ignite.cache(null);
+
+ assertNotNull(cache);
+
+ assertEquals("Failed to check entry value on node: " + checkNodeId,
+ fullFailure ? initVal : val, cache.peek(key));
+
+ return null;
+ }
+ });
+ }
+ }
+
+ for (Map.Entry<Integer, String> e : map.entrySet()) {
+ for (Ignite g : G.allGrids())
+ assertEquals(fullFailure ? initVal : e.getValue(), g.cache(null).get(e.getKey()));
+ }
+ }
+
+ /**
+ * Checks tx data consistency in case when primary node crashes.
+ *
+ * @param commmit Whether to commit or rollback a transaction.
+ * @throws Exception If failed.
+ */
+ private void checkPrimaryNodeCrash(final boolean commmit) throws Exception {
+ Collection<Integer> keys = new ArrayList<>(20);
+
+ for (int i = 0; i < 20; i++)
+ keys.add(i);
+
+ final Collection<GridKernal> grids = new ArrayList<>();
+
+ ClusterNode primaryNode = grid(1).localNode();
+
+ for (int i = 0; i < gridCount(); i++) {
+ if (i != 1)
+ grids.add((GridKernal)grid(i));
+ }
+
+ failingNodeId = primaryNode.id();
+
+ final Map<Integer, String> map = new HashMap<>();
+
+ final String initVal = "initialValue";
+
+ for (Integer key : keys) {
+ grid(originatingNode()).cache(null).put(key, initVal);
+
+ map.put(key, String.valueOf(key));
+ }
+
+ Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>();
+
+ GridCache<Integer, String> cache = grid(0).cache(null);
+
+ info("Failing node ID: " + grid(1).localNode().id());
+
+ for (Integer key : keys) {
+ Collection<ClusterNode> nodes = new ArrayList<>();
+
+ nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key));
+
+ nodes.remove(primaryNode);
+
+ nodeMap.put(key, nodes);
+ }
+
+ info("Starting tx [values=" + map + ", topVer=" +
+ ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']');
+
+ assertNotNull(cache);
+
+ try (IgniteTx tx = cache.txStart()) {
+ cache.getAll(keys);
+
+ // Should not send any messages.
+ cache.putAll(map);
+
+ // Fail the node in the middle of transaction.
+ info(">>> Stopping primary node " + primaryNode);
+
+ G.stop(G.ignite(primaryNode.id()).name(), true);
+
+ info(">>> Stopped originating node, finishing transaction: " + primaryNode.id());
+
+ if (commmit)
+ tx.commit();
+ else
+ tx.rollback();
+ }
+
+ boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (GridKernal g : grids) {
+ GridCacheAdapter<?, ?> cache = g.internalCache();
+
+ GridCacheTxManager txMgr = cache.isNear() ?
+ ((GridNearCacheAdapter)cache).dht().context().tm() :
+ cache.context().tm();
+
+ int txNum = txMgr.idMapSize();
+
+ if (txNum != 0)
+ return false;
+ }
+
+ return true;
+ }
+ }, 10000);
+
+ assertTrue(txFinished);
+
+ info("Transactions finished.");
+
+ for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) {
+ final Integer key = e.getKey();
+
+ final String val = map.get(key);
+
+ assertFalse(e.getValue().isEmpty());
+
+ for (ClusterNode node : e.getValue()) {
+ final UUID checkNodeId = node.id();
+
+ compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public Void call() throws Exception {
+ GridCache<Integer, String> cache = ignite.cache(null);
+
+ assertNotNull(cache);
+
+ assertEquals("Failed to check entry value on node: " + checkNodeId,
+ !commmit ? initVal : val, cache.peek(key));
+
+ return null;
+ }
+ });
+ }
+ }
+
+ for (Map.Entry<Integer, String> e : map.entrySet()) {
+ for (Ignite g : G.allGrids())
+ assertEquals(!commmit ? initVal : e.getValue(), g.cache(null).get(e.getKey()));
+ }
+ }
+
+ /**
+ * @return All node IDs.
+ */
+ private Collection<UUID> allNodeIds() {
+ Collection<UUID> nodeIds = new ArrayList<>(gridCount());
+
+ for (int i = 0; i < gridCount(); i++)
+ nodeIds.add(grid(i).localNode().id());
+
+ return nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg)
+ throws IgniteSpiException {
+ if (getSpiContext().localNode().id().equals(failingNodeId)) {
+ if (ignoredMessage((GridIoMessage)msg) && ignoreMsgNodeIds != null) {
+ for (UUID ignored : ignoreMsgNodeIds) {
+ if (node.id().equals(ignored))
+ return;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+ });
+
+ cfg.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setStore(null);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_CNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected abstract GridCacheMode cacheMode();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGridsMultiThreaded(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ ignoreMsgCls = null;
+ ignoreMsgNodeIds = null;
+ }
+
+ /**
+ * Checks if message should be ignored.
+ *
+ * @param msg Message.
+ * @return {@code True} if message should be ignored.
+ */
+ private boolean ignoredMessage(GridIoMessage msg) {
+ Collection<Class<?>> ignoreClss = ignoreMsgCls;
+
+ if (ignoreClss != null) {
+ for (Class<?> ignoreCls : ignoreClss) {
+ if (ignoreCls.isAssignableFrom(msg.message().getClass()))
+ return true;
+ }
+
+ return false;
+ }
+ else
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
new file mode 100644
index 0000000..0483a26
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
@@ -0,0 +1,192 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+
+/**
+ * Tests transaction during cache preloading.
+ */
+public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfTest {
+ /** */
+ private static final int GRID_CNT = 6;
+
+ /** */
+ private static volatile boolean keyNotLoaded;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ keyNotLoaded = false;
+
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_CNT;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteTxPreloading() throws Exception {
+ GridCache<String, Integer> cache = cache(0);
+
+ for (int i = 0; i < 10000; i++)
+ cache.put(String.valueOf(i), 0);
+
+ final AtomicInteger gridIdx = new AtomicInteger(1);
+
+ IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+ new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ int idx = gridIdx.getAndIncrement();
+
+ startGrid(idx);
+
+ return null;
+ }
+ },
+ GRID_CNT - 1,
+ "grid-starter-" + getName()
+ );
+
+ waitForRemoteNodes(grid(0), 2);
+
+ Set<String> keys = new HashSet<>();
+
+ for (int i = 0; i < 10; i++)
+ keys.add(String.valueOf(i * 1000));
+
+ cache.transformAll(keys, new C1<Integer, Integer>() {
+ @Override public Integer apply(Integer val) {
+ if (val == null)
+ keyNotLoaded = true;
+
+ return val + 1;
+ }
+ });
+
+ assertFalse(keyNotLoaded);
+
+ fut.get();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ // Wait for preloader.
+ cache(i).forceRepartition().get();
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ for (String key : keys)
+ assertEquals("Unexpected value for cache " + i, (Integer)1, cache(i).get(key));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalTxPreloadingOptimistic() throws Exception {
+ testLocalTxPreloading(OPTIMISTIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalTxPreloadingPessimistic() throws Exception {
+ testLocalTxPreloading(PESSIMISTIC);
+ }
+
+ /**
+ * Tries to execute transaction doing transform when target key is not yet preloaded.
+ *
+ * @param txConcurrency Transaction concurrency;
+ * @throws Exception If failed.
+ */
+ private void testLocalTxPreloading(GridCacheTxConcurrency txConcurrency) throws Exception {
+ Map<String, Integer> map = new HashMap<>();
+
+ for (int i = 0; i < 10000; i++)
+ map.put(String.valueOf(i), 0);
+
+ GridCache<String, Integer> cache0 = cache(0);
+
+ cache0.putAll(map);
+
+ final String TX_KEY = "9000";
+
+ int expVal = 0;
+
+ for (int i = 1; i < GRID_CNT; i++) {
+ assertEquals((Integer)expVal, cache0.get(TX_KEY));
+
+ startGrid(i);
+
+ GridCache<String, Integer> cache = cache(i);
+
+ try (IgniteTx tx = cache.txStart(txConcurrency, GridCacheTxIsolation.READ_COMMITTED)) {
+ cache.transform(TX_KEY, new C1<Integer, Integer>() {
+ @Override public Integer apply(Integer val) {
+ if (val == null) {
+ keyNotLoaded = true;
+
+ return 1;
+ }
+
+ return val + 1;
+ }
+ });
+
+ tx.commit();
+ }
+
+ assertFalse(keyNotLoaded);
+
+ expVal++;
+
+ assertEquals((Integer)expVal, cache.get(TX_KEY));
+ }
+
+ for (int i = 0; i < GRID_CNT; i++)
+ assertEquals("Unexpected value for cache " + i, (Integer)expVal, cache(i).get(TX_KEY));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setPreloadMode(GridCachePreloadMode.ASYNC);
+
+ cfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC);
+
+ cfg.setStore(null);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
new file mode 100644
index 0000000..7a8f65e
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
@@ -0,0 +1,147 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.transactions.GridCacheTxConcurrency.*;
+import static org.apache.ignite.transactions.GridCacheTxIsolation.*;
+
+/**
+ * Simple cache test.
+ */
+public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest {
+ /** Random number generator. */
+ private static final Random RAND = new Random();
+
+ /** Grid count. */
+ private static final int GRID_COUNT = 2;
+
+ /** Grid instances. */
+ private static final List<Ignite> IGNITEs = new ArrayList<>();
+
+ /** Transaction timeout. */
+ private static final long TIMEOUT = 50;
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 0; i < GRID_COUNT; i++)
+ IGNITEs.add(startGrid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ IGNITEs.clear();
+ }
+
+ /**
+ * @param i Grid index.
+ * @return Cache.
+ */
+ @Override protected <K, V> GridCache<K, V> cache(int i) {
+ return IGNITEs.get(i).cache(null);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticReadCommitted() throws Exception {
+ checkTransactionTimeout(PESSIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticRepeatableRead() throws Exception {
+ checkTransactionTimeout(PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testPessimisticSerializable() throws Exception {
+ checkTransactionTimeout(PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticReadCommitted() throws Exception {
+ checkTransactionTimeout(OPTIMISTIC, READ_COMMITTED);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticRepeatableRead() throws Exception {
+ checkTransactionTimeout(OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws IgniteCheckedException If test failed.
+ */
+ public void testOptimisticSerializable() throws Exception {
+ checkTransactionTimeout(OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @throws IgniteCheckedException If test failed.
+ */
+ private void checkTransactionTimeout(GridCacheTxConcurrency concurrency,
+ GridCacheTxIsolation isolation) throws Exception {
+
+ int idx = RAND.nextInt(GRID_COUNT);
+
+ GridCache<Integer, String> cache = cache(idx);
+
+ IgniteTx tx = cache.txStart(concurrency, isolation, TIMEOUT, 0);
+
+ try {
+ info("Storing value in cache [key=1, val=1]");
+
+ cache.put(1, "1");
+
+ long sleep = TIMEOUT * 2;
+
+ info("Going to sleep for (ms): " + sleep);
+
+ Thread.sleep(sleep);
+
+ info("Storing value in cache [key=1, val=2]");
+
+ cache.put(1, "2");
+
+ info("Committing transaction: " + tx);
+
+ tx.commit();
+
+ assert false : "Timeout never happened for transaction: " + tx;
+ }
+ catch (GridCacheTxTimeoutException e) {
+ info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']');
+ }
+ finally {
+ tx.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
index c79683d..f629304 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
@@ -237,7 +237,7 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
info(">>> Starting transform transaction");
- try (GridCacheTx tx = cache.txStart(concurrency, READ_COMMITTED)) {
+ try (IgniteTx tx = cache.txStart(concurrency, READ_COMMITTED)) {
if (op == OP_UPDATE) {
for (String key : keys)
cache.transform(key, INCR_CLOS);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
index ff016c6..76bf537 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java
@@ -420,7 +420,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
g0.cache(null).put(i, i);
for (int i = 0; i < 100; i++) {
- try (GridCacheTx tx = g0.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = g0.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) {
Integer val = (Integer) g0.cache(null).get(i);
assertEquals((Integer) i, val);
@@ -443,7 +443,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
startGrid();
try {
- GridCacheTx tx = explicitTx ? cache().txStart(concurrency, isolation) : null;
+ IgniteTx tx = explicitTx ? cache().txStart(concurrency, isolation) : null;
try {
cache().putAll(F.asMap(1, "Hello", 2, "World"));
@@ -476,7 +476,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
startGrid();
try {
- GridCacheTx tx = cache().txStart(concurrency, isolation);
+ IgniteTx tx = cache().txStart(concurrency, isolation);
try {
String old = (String)cache().get(1);
@@ -532,7 +532,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
Map<Integer, String> map = F.asMap(k0, "val" + k0, k1, "val" + k1, k2, "val" + k2);
- GridCacheTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null;
+ IgniteTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null;
try {
if (separate) {
@@ -621,7 +621,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
Map<Integer, String> map = F.asMap(k1, "val" + k1, k2, "val" + k2);
- GridCacheTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null;
+ IgniteTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null;
try {
if (separate) {
@@ -734,7 +734,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
clearStores(3);
- try (GridCacheTx tx = g0.cache(null).txStart(OPTIMISTIC, READ_COMMITTED)) {
+ try (IgniteTx tx = g0.cache(null).txStart(OPTIMISTIC, READ_COMMITTED)) {
g0.cache(null).putAll(map);
tx.commit();
@@ -799,7 +799,7 @@ public class GridCacheColocatedDebugTest extends GridCommonAbstractTest {
Map<Integer, String> map = F.asMap(k0, "value" + k0, k1, "value" + k1, k2, "value" + k2);
- GridCacheTx tx = g0.cache(null).txStart(concurrency, isolation);
+ IgniteTx tx = g0.cache(null).txStart(concurrency, isolation);
try {
if (separate) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java
index c6f5d4d..7855a99 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java
@@ -109,7 +109,7 @@ public class GridCacheColocatedOptimisticTransactionSelfTest extends GridCommonA
*/
public void testOptimisticTransaction() throws Exception {
for (GridCache<Integer, String> cache : caches) {
- GridCacheTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
+ IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ);
try {
cache.put(KEY, VAL);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java
index 7629566..a62e1c3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java
@@ -18,7 +18,7 @@ import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
/**
* Tests colocated cache.
*/
-public class GridCacheColocatedTxExceptionSelfTest extends GridCacheTxExceptionAbstractSelfTest {
+public class GridCacheColocatedTxExceptionSelfTest extends IgniteTxExceptionAbstractSelfTest {
/** {@inheritDoc} */
@Override protected GridCacheMode cacheMode() {
return PARTITIONED;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java
index 94707fd..662ebe8 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java
@@ -23,7 +23,7 @@ import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
* Test pessimistic tx failures in colocated cache.
*/
public class GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest extends
- GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest {
+ IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest {
/** {@inheritDoc} */
@Override protected Collection<Class<?>> ignoreMessageClasses() {
return F.asList((Class<?>)GridNearTxFinishRequest.class, GridDhtTxFinishRequest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java
index 3da14de..e08e874 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java
@@ -24,7 +24,7 @@ import static org.gridgain.grid.cache.GridCachePreloadMode.*;
/**
* Test txs in single-threaded mode for colocated cache.
*/
-public class GridCacheColocatedTxSingleThreadedSelfTest extends GridCacheTxSingleThreadedAbstractTest {
+public class GridCacheColocatedTxSingleThreadedSelfTest extends IgniteTxSingleThreadedAbstractTest {
/** Cache debug flag. */
private static final boolean CACHE_DEBUG = false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
index e67a1a9..7b52984 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java
@@ -102,7 +102,7 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest {
}
for (int i = 0; i < GRID_CNT; i++) {
- GridCacheTx tx = grid(i).cache(null).tx();
+ IgniteTx tx = grid(i).cache(null).tx();
if (tx != null)
tx.close();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
index 7052700..9950d62 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java
@@ -10,7 +10,6 @@
package org.gridgain.grid.kernal.processors.cache.distributed.dht;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.affinity.consistenthash.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
@@ -19,7 +18,7 @@ import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
/**
* Tests cache transaction during preloading.
*/
-public class GridCacheDhtTxPreloadSelfTest extends GridCacheTxPreloadAbstractTest {
+public class GridCacheDhtTxPreloadSelfTest extends IgniteTxPreloadAbstractTest {
/** {@inheritDoc} */
@Override protected GridCacheMode cacheMode() {
return PARTITIONED;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedAbstractSelfTest.java
index c98b0a3..bf15118 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedAbstractSelfTest.java
@@ -75,7 +75,7 @@ public abstract class GridCacheGroupLockPartitionedAbstractSelfTest extends Grid
cache.put(new GridCacheAffinityKey<>(i, affinityKey), i);
for (int i = 0; i < 3; i++) {
- try (GridCacheTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 10)) {
+ try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 10)) {
Set<GridCacheEntry<GridCacheAffinityKey<Integer>, Integer>> set =
cache.entrySet(cache(0).affinity().partition(affinityKey));
@@ -106,7 +106,7 @@ public abstract class GridCacheGroupLockPartitionedAbstractSelfTest extends Grid
final GridCache<UUID, String> cache = grid(0).cache(null);
- try (GridCacheTx tx = cache.txStartPartition(cache.affinity().partition(affinityKey), PESSIMISTIC, REPEATABLE_READ,
+ try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affinityKey), PESSIMISTIC, REPEATABLE_READ,
0, 2)) {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java
index e11afb3..2f37a2e 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java
@@ -52,7 +52,7 @@ public abstract class GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest ext
GridCache<Object, Object> cache = grid(0).cache(null);
- GridCacheTx tx = null;
+ IgniteTx tx = null;
try {
tx = cache.txStartAffinity(key, concurrency, READ_COMMITTED, 0, 2);
@@ -146,7 +146,7 @@ public abstract class GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest ext
assertEquals("val3", reader.cache(null).peek(key3));
}
- try (GridCacheTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 3)) {
+ try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 3)) {
cache.putAll(F.asMap(
key1, "val01",
key2, "val02",
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTopologyChangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTopologyChangeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTopologyChangeSelfTest.java
index 7b0d1b7..bf2bd80 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTopologyChangeSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTopologyChangeSelfTest.java
@@ -247,7 +247,7 @@ public class GridCachePartitionedTopologyChangeSelfTest extends GridCommonAbstra
GridCache<Integer, Integer> cache = node.cache(null);
try {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key, key);
info(">>> Locked key, waiting for latch: " + key);
@@ -318,7 +318,7 @@ public class GridCachePartitionedTopologyChangeSelfTest extends GridCommonAbstra
int key = (int)Thread.currentThread().getId();
try {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
// This method should block until all previous transactions are completed.
cache.put(key, key);
@@ -401,7 +401,7 @@ public class GridCachePartitionedTopologyChangeSelfTest extends GridCommonAbstra
GridCache<Integer, Integer> cache = node.cache(null);
try {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(key, key);
commitLatch.await();
@@ -454,7 +454,7 @@ public class GridCachePartitionedTopologyChangeSelfTest extends GridCommonAbstra
int key = (int)Thread.currentThread().getId();
try {
- try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
// This method should block until all previous transactions are completed.
cache.put(key, key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
index d324abd..80aec71 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
@@ -22,7 +22,7 @@ import java.util.*;
* Tests transaction consistency when originating node fails.
*/
public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
- GridCacheTxOriginatingNodeFailureAbstractSelfTest {
+ IgniteTxOriginatingNodeFailureAbstractSelfTest {
/** */
private static final int BACKUP_CNT = 2;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxConsistencyColocatedRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxConsistencyColocatedRestartSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxConsistencyColocatedRestartSelfTest.java
deleted file mode 100644
index b74cfc9..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxConsistencyColocatedRestartSelfTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-
-/**
- *
- */
-public class GridCacheTxConsistencyColocatedRestartSelfTest extends GridCacheTxConsistencyRestartAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected GridCacheMode cacheMode() {
- return GridCacheMode.PARTITIONED;
- }
-
- /** {@inheritDoc} */
- @Override protected GridCacheDistributionMode partitionDistributionMode() {
- return GridCacheDistributionMode.PARTITIONED_ONLY;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxReentryColocatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxReentryColocatedSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxReentryColocatedSelfTest.java
deleted file mode 100644
index faf74f3..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheTxReentryColocatedSelfTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.cluster.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-
-import java.util.*;
-
-import static org.gridgain.grid.cache.GridCacheMode.*;
-
-/**
- *
- */
-public class GridCacheTxReentryColocatedSelfTest extends GridCacheTxReentryAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected GridCacheMode cacheMode() {
- return PARTITIONED;
- }
-
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override protected int testKey() {
- int key = 0;
-
- GridCache<Object, Object> cache = grid(0).cache(null);
-
- while (true) {
- Collection<ClusterNode> nodes = cache.affinity().mapKeyToPrimaryAndBackups(key);
-
- if (nodes.contains(grid(0).localNode()))
- key++;
- else
- break;
- }
-
- return key;
- }
-
- /** {@inheritDoc} */
- @Override protected int expectedNearLockRequests() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override protected int expectedDhtLockRequests() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override protected int expectedDistributedLockRequests() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean nearEnabled() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxConsistencyColocatedRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxConsistencyColocatedRestartSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxConsistencyColocatedRestartSelfTest.java
new file mode 100644
index 0000000..7756089
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxConsistencyColocatedRestartSelfTest.java
@@ -0,0 +1,28 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed.dht;
+
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+
+/**
+ *
+ */
+public class IgniteTxConsistencyColocatedRestartSelfTest extends IgniteTxConsistencyRestartAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return GridCacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDistributionMode partitionDistributionMode() {
+ return GridCacheDistributionMode.PARTITIONED_ONLY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxReentryColocatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxReentryColocatedSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxReentryColocatedSelfTest.java
new file mode 100644
index 0000000..812438f
--- /dev/null
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/IgniteTxReentryColocatedSelfTest.java
@@ -0,0 +1,71 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+
+import java.util.*;
+
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteTxReentryColocatedSelfTest extends IgniteTxReentryAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected GridCacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int testKey() {
+ int key = 0;
+
+ GridCache<Object, Object> cache = grid(0).cache(null);
+
+ while (true) {
+ Collection<ClusterNode> nodes = cache.affinity().mapKeyToPrimaryAndBackups(key);
+
+ if (nodes.contains(grid(0).localNode()))
+ key++;
+ else
+ break;
+ }
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int expectedNearLockRequests() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int expectedDhtLockRequests() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int expectedDistributedLockRequests() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean nearEnabled() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheAtomicPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheAtomicPreloadSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheAtomicPreloadSelfTest.java
index d84a206..8884116 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheAtomicPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheAtomicPreloadSelfTest.java
@@ -104,7 +104,7 @@ public class GridCacheAtomicPreloadSelfTest extends GridCommonAbstractTest {
info("Checking transaction for key [idx=" + i + ", key=" + key + ']');
info(">>>>>>>>>>>>>>>");
- try (GridCacheTx tx = txs.txStart(concurrency, REPEATABLE_READ)) {
+ try (IgniteTx tx = txs.txStart(concurrency, REPEATABLE_READ)) {
try {
// Lock if pessimistic, read if optimistic.
cache.get(key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
index 37f88a2..d86beff 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
@@ -58,17 +58,17 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest {
cc.setAtomicityMode(TRANSACTIONAL);
cc.setStore(new GridCacheStoreAdapter<Object, Object>() {
- @Override public Object load(@Nullable GridCacheTx tx, Object key)
+ @Override public Object load(@Nullable IgniteTx tx, Object key)
throws IgniteCheckedException {
throw new IgniteCheckedException("Failed to get key from store: " + key);
}
- @Override public void put(@Nullable GridCacheTx tx, Object key,
+ @Override public void put(@Nullable IgniteTx tx, Object key,
@Nullable Object val) {
// No-op.
}
- @Override public void remove(@Nullable GridCacheTx tx, Object key) {
+ @Override public void remove(@Nullable IgniteTx tx, Object key) {
// No-op.
}
});