You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 12:54:47 UTC
ignite git commit: IGNITE-10080 Optimized Cache 6 long-running tests
- Fixes #5243.
Repository: ignite
Updated Branches:
refs/heads/master fe8c8cc58 -> 0e3404281
IGNITE-10080 Optimized Cache 6 long-running tests - Fixes #5243.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e340428
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e340428
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e340428
Branch: refs/heads/master
Commit: 0e3404281666668741fb8600d6e9021cccabe6d3
Parents: fe8c8cc
Author: pereslegin-pa <xx...@gmail.com>
Authored: Wed Nov 28 15:51:15 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 28 15:51:15 2018 +0300
----------------------------------------------------------------------
.../distributed/CacheExchangeMergeTest.java | 4 +-
...ptimisticTxSuspendResumeMultiServerTest.java | 30 ---
.../IgniteOptimisticTxSuspendResumeTest.java | 228 +++++++++++++------
...OptimisticPrepareOnUnstableTopologyTest.java | 164 ++++++-------
.../transactions/TxRollbackOnTimeoutTest.java | 3 +-
.../ignite/testframework/GridTestUtils.java | 6 +-
.../testsuites/IgniteCacheTestSuite6.java | 2 -
7 files changed, 238 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 03ea539..2dad0b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -419,7 +419,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void concurrentStart(final boolean withClients) throws Exception {
- for (int i = 0; i < 5; i++) {
+ int iterations = GridTestUtils.SF.applyLB(5, 1);
+
+ for (int i = 0; i < iterations; i++) {
log.info("Iteration: " + i);
startGrid(0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
deleted file mode 100644
index b7003d4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-/**
- *
- */
-public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest {
- /**
- * @return Number of server nodes.
- */
- @Override protected int serversNumber() {
- return 4;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
index 66c204d..73a7dc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -19,17 +19,21 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.PA;
@@ -62,7 +66,13 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
private static final int FUT_TIMEOUT = 5000;
/** */
- private boolean client = false;
+ private static final int CLIENT_CNT = 2;
+
+ /** */
+ private static final int SERVER_CNT = 4;
+
+ /** */
+ private static final int GRID_CNT = CLIENT_CNT + SERVER_CNT;
/**
* List of closures to execute transaction operation that prohibited in suspended state.
@@ -109,6 +119,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ int idx = getTestIgniteInstanceIndex(igniteInstanceName);
+
+ boolean client = idx >= SERVER_CNT && idx < GRID_CNT;
+
cfg.setClientMode(client);
return cfg;
@@ -118,16 +132,21 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- startGrids(serversNumber());
+ startGridsMultiThreaded(gridCount());
+ }
- if (serversNumber() > 1) {
- client = true;
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
- startGrid(serversNumber());
+ Ignite client = ignite(gridCount() - 1);
- startGrid(serversNumber() + 1);
+ assertTrue(client.cluster().localNode().isClient());
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ grid(0).createCache(ccfg);
- client = false;
+ client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
}
awaitPartitionMapExchange();
@@ -138,11 +157,19 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
stopAllGrids(true);
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ for (CacheConfiguration ccfg : cacheConfigurations())
+ ignite(0).destroyCache(ccfg.getName());
+
+ super.afterTest();
+ }
+
/**
* @return Number of server nodes.
*/
- protected int serversNumber() {
- return 1;
+ protected int gridCount() {
+ return GRID_CNT;
}
/**
@@ -215,8 +242,8 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
@Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
- final IgniteCache<Integer, Integer> otherCache =
- ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache"));
+ final IgniteCache<Integer, Integer> otherCache = ignite.getOrCreateCache(
+ cacheConfiguration("otherCache", PARTITIONED, 0, false));
final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
@@ -435,10 +462,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
tx.suspend();
- long start = U.currentTimeMillis();
-
- while(TX_TIMEOUT >= U.currentTimeMillis() - start)
- Thread.sleep(TX_TIMEOUT * 2);
+ U.sleep(TX_TIMEOUT * 2);
GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -475,10 +499,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
cache.put(1, 1);
- long start = U.currentTimeMillis();
-
- while(TX_TIMEOUT >= U.currentTimeMillis() - start)
- Thread.sleep(TX_TIMEOUT * 2);
+ U.sleep(TX_TIMEOUT * 2);
GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -599,33 +620,92 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
public void testSuspendTxAndResumeAfterTopologyChange() throws Exception {
- executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
- @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ Ignite srv = ignite(ThreadLocalRandom.current().nextInt(SERVER_CNT));
+ Ignite client = ignite(SERVER_CNT);
+ Ignite clientNear = ignite(SERVER_CNT + 1);
+
+ Map<String, List<List<Integer>>> cacheKeys = generateKeys(srv, TransactionIsolation.values().length);
+
+ doCheckSuspendTxAndResume(srv, cacheKeys);
+ doCheckSuspendTxAndResume(client, cacheKeys);
+ doCheckSuspendTxAndResume(clientNear, cacheKeys);
+ }
+
+ /**
+ * @param node Ignite isntance.
+ * @param cacheKeys Different key types mapped to cache name.
+ * @throws Exception If failed.
+ */
+ private void doCheckSuspendTxAndResume(Ignite node, Map<String, List<List<Integer>>> cacheKeys) throws Exception {
+ ClusterNode locNode = node.cluster().localNode();
+
+ log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']');
+
+ Map<IgniteCache<Integer, Integer>, Map<Transaction, Integer>> cacheTxMap = new IdentityHashMap<>();
+
+ for (Map.Entry<String, List<List<Integer>>> cacheKeysEntry : cacheKeys.entrySet()) {
+ String cacheName = cacheKeysEntry.getKey();
+
+ IgniteCache<Integer, Integer> cache = node.cache(cacheName);
+
+ Map<Transaction, Integer> suspendedTxs = new IdentityHashMap<>();
+
+ for (List<Integer> keysList : cacheKeysEntry.getValue()) {
for (TransactionIsolation isolation : TransactionIsolation.values()) {
- Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+ Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation);
- cache.put(1, 1);
+ int key = keysList.get(isolation.ordinal());
+
+ cache.put(key, key);
tx.suspend();
- assertEquals(SUSPENDED, tx.state());
+ suspendedTxs.put(tx, key);
- try (IgniteEx g = startGrid(serversNumber() + 3)) {
- tx.resume();
+ String msg = "node=" + node.cluster().localNode() +
+ ", cache=" + cacheName + ", isolation=" + isolation + ", key=" + key;
- assertEquals(ACTIVE, tx.state());
+ assertEquals(msg, SUSPENDED, tx.state());
+ }
+ }
- assertEquals(1, (int)cache.get(1));
+ cacheTxMap.put(cache, suspendedTxs);
+ }
- tx.commit();
+ int newNodeIdx = gridCount();
- assertEquals(1, (int)cache.get(1));
- }
+ startGrid(newNodeIdx);
- cache.removeAll();
+ try {
+ for (Map.Entry<IgniteCache<Integer, Integer>, Map<Transaction, Integer>> entry : cacheTxMap.entrySet()) {
+ IgniteCache<Integer, Integer> cache = entry.getKey();
+
+ for (Map.Entry<Transaction, Integer> suspendedTx : entry.getValue().entrySet()) {
+ Transaction tx = suspendedTx.getKey();
+
+ Integer key = suspendedTx.getValue();
+
+ tx.resume();
+
+ String msg = "node=" + node.cluster().localNode() +
+ ", cache=" + cache.getName() + ", isolation=" + tx.isolation() + ", key=" + key;
+
+ assertEquals(msg, ACTIVE, tx.state());
+
+ assertEquals(msg, key, cache.get(key));
+
+ tx.commit();
+
+ assertEquals(msg, key, cache.get(key));
}
}
- });
+ }
+ finally {
+ stopGrid(newNodeIdx);
+
+ for (IgniteCache<Integer, Integer> cache : cacheTxMap.keySet())
+ cache.removeAll();
+ }
}
/**
@@ -666,10 +746,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
- cfgs.add(cacheConfiguration(PARTITIONED, 0, false));
- cfgs.add(cacheConfiguration(PARTITIONED, 1, false));
- cfgs.add(cacheConfiguration(PARTITIONED, 1, true));
- cfgs.add(cacheConfiguration(REPLICATED, 0, false));
+ cfgs.add(cacheConfiguration("cache1", PARTITIONED, 0, false));
+ cfgs.add(cacheConfiguration("cache2", PARTITIONED, 1, false));
+ cfgs.add(cacheConfiguration("cache3", PARTITIONED, 1, true));
+ cfgs.add(cacheConfiguration("cache4", REPLICATED, 0, false));
return cfgs;
}
@@ -681,10 +761,11 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
* @return Cache configuration.
*/
private CacheConfiguration<Integer, Integer> cacheConfiguration(
+ String name,
CacheMode cacheMode,
int backups,
boolean nearCache) {
- CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(name);
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(TRANSACTIONAL);
@@ -701,37 +782,56 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
/**
* @param c Closure.
- * @throws Exception If failed.
*/
- private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
- for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
- ignite(0).createCache(ccfg);
+ private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) {
+ for (int i = 0; i < gridCount(); i++) {
+ Ignite ignite = ignite(i);
- log.info("Run test for cache [cache=" + ccfg.getCacheMode() +
- ", backups=" + ccfg.getBackups() +
- ", near=" + (ccfg.getNearConfiguration() != null) + "]");
+ ClusterNode locNode = ignite.cluster().localNode();
- awaitPartitionMapExchange();
+ log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']');
- int srvNum = serversNumber();
- if (serversNumber() > 1) {
- ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
- srvNum += 2;
- }
+ for (CacheConfiguration ccfg : cacheConfigurations())
+ c.apply(ignite, ignite.cache(ccfg.getName()));
+ }
+ }
- try {
- for (int i = 0; i < srvNum; i++) {
- Ignite ignite = ignite(i);
+ /**
+ * Generates list of keys (primary, backup and neither primary nor backup).
+ *
+ * @param ignite Ignite instance.
+ * @param keysCnt The number of keys generated for each type of key.
+ * @return List of different keys mapped to cache name.
+ */
+ private Map<String, List<List<Integer>>> generateKeys(Ignite ignite, int keysCnt) {
+ Map<String, List<List<Integer>>> cacheKeys = new HashMap<>();
- log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']');
+ for (CacheConfiguration cfg : cacheConfigurations()) {
+ String cacheName = cfg.getName();
- c.apply(ignite, ignite.<Integer, Integer>cache(ccfg.getName()));
- }
- }
- finally {
- ignite(0).destroyCache(ccfg.getName());
+ IgniteCache cache = ignite.cache(cacheName);
+
+ List<List<Integer>> keys = new ArrayList<>();
+
+ // Generate different keys: 0 - primary, 1 - backup, 2 - neither primary nor backup.
+ for (int type = 0; type < 3; type++) {
+ if (type == 1 && cfg.getCacheMode() == PARTITIONED && cfg.getBackups() == 0)
+ continue;
+
+ if (type == 2 && cfg.getCacheMode() == REPLICATED)
+ continue;
+
+ List<Integer> keys0 = findKeys(cache, keysCnt, type * 100_000, type);
+
+ assertEquals(cacheName, keysCnt, keys0.size());
+
+ keys.add(keys0);
}
+
+ cacheKeys.put(cacheName, keys);
}
+
+ return cacheKeys;
}
/**
@@ -750,7 +850,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
*/
public abstract void applyx(E1 e1, E2 e2) throws Exception;
- /** {@inheritdoc} */
+ /** {@inheritDoc} */
@Override public void apply(E1 e1, E2 e2) {
try {
applyx(e1, e2);
@@ -775,7 +875,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
*/
public abstract void applyx(T o) throws Exception;
- /** {@inheritdoc} */
+ /** {@inheritDoc} */
@Override public void apply(T o) {
try {
applyx(o);
@@ -797,7 +897,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
*/
public abstract void runx() throws Exception;
- /** {@inheritdoc} */
+ /** {@inheritDoc} */
@Override public void run() {
try {
runx();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
index 21dcf90..cbdcffe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
@@ -17,22 +17,23 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -49,21 +50,17 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
/** */
public static final String CACHE_NAME = "part_cache";
- /** IP finder. */
- private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
/** */
- private volatile boolean run = true;
+ private static final int STARTUP_DELAY = 500;
/** */
- private boolean client;
+ private static final int GRID_CNT = 4;
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
- assertEquals(0, G.allGrids().size());
- }
+ /** */
+ private boolean client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -110,58 +107,42 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
*/
private void doPrepareOnUnstableTopology(int keys, boolean testClient, TransactionIsolation isolation,
long timeout) throws Exception {
- Collection<Thread> threads = new ArrayList<>();
-
- try {
- // Start grid 1.
- IgniteEx grid1 = startGrid(0);
-
- assertFalse(grid1.configuration().isClientMode());
-
- threads.add(runCacheOperations(grid1, isolation, timeout, keys));
-
- TimeUnit.SECONDS.sleep(3L);
-
- client = testClient; // If test client start on node in client mode.
-
- // Start grid 2.
- IgniteEx grid2 = startGrid(1);
-
- assertEquals((Object)testClient, grid2.configuration().isClientMode());
+ GridCompoundFuture<Void, Object> compFut = new GridCompoundFuture<>();
- client = false;
+ AtomicBoolean stopFlag = new AtomicBoolean();
- threads.add(runCacheOperations(grid2, isolation, timeout, keys));
-
- TimeUnit.SECONDS.sleep(3L);
-
- // Start grid 3.
- IgniteEx grid3 = startGrid(2);
+ try {
+ int clientIdx = testClient ? 1 : -1;
- assertFalse(grid3.configuration().isClientMode());
+ try {
+ for (int i = 0; i < GRID_CNT; i++) {
+ client = (clientIdx == i);
- if (testClient)
- log.info("Started client node: " + grid3.name());
+ IgniteEx grid = startGrid(i);
- threads.add(runCacheOperations(grid3, isolation, timeout, keys));
+ assertEquals(client, grid.configuration().isClientMode().booleanValue());
- TimeUnit.SECONDS.sleep(3L);
+ client = false;
- // Start grid 4.
- IgniteEx grid4 = startGrid(3);
+ IgniteInternalFuture<Void> fut = runCacheOperationsAsync(grid, stopFlag, isolation, timeout, keys);
- assertFalse(grid4.configuration().isClientMode());
+ compFut.add(fut);
- threads.add(runCacheOperations(grid4, isolation, timeout, keys));
+ U.sleep(STARTUP_DELAY);
+ }
+ }
+ finally {
+ stopFlag.set(true);
+ }
- TimeUnit.SECONDS.sleep(3L);
+ compFut.markInitialized();
- stopThreads(threads);
+ compFut.get();
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < GRID_CNT; i++) {
IgniteTxManager tm = ((IgniteKernal)grid(i)).internalCache(CACHE_NAME).context().tm();
- assertEquals("txMap is not empty:" + i, 0, tm.idMapSize());
+ assertEquals("txMap is not empty: " + i, 0, tm.idMapSize());
}
}
finally {
@@ -170,63 +151,50 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
}
/**
- * @param threads Thread which will be stopped.
- */
- private void stopThreads(Iterable<Thread> threads) {
- try {
- run = false;
-
- for (Thread thread : threads)
- thread.join();
- }
- catch (Exception e) {
- U.error(log(), "Couldn't stop threads.", e);
- }
- }
-
- /**
* @param node Node.
* @param isolation Isolation.
* @param timeout Timeout.
* @param keys Number of keys.
- * @return Running thread.
+ * @return Future representing pending completion of the operation.
*/
- private Thread runCacheOperations(Ignite node, TransactionIsolation isolation, long timeout, final int keys) {
- Thread t = new Thread() {
- @Override public void run() {
- while (run) {
- TreeMap<Integer, String> vals = generateValues(keys);
-
- try {
- try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation,
- timeout, keys)){
-
- IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
-
- // Put or remove.
- if (ThreadLocalRandom.current().nextDouble(1) < 0.65)
- cache.putAll(vals);
- else
- cache.removeAll(vals.keySet());
-
- tx.commit();
- }
- catch (Exception e) {
- U.error(log(), "Failed cache operation.", e);
- }
-
- U.sleep(100);
+ private IgniteInternalFuture<Void> runCacheOperationsAsync(
+ Ignite node,
+ AtomicBoolean stopFlag,
+ TransactionIsolation isolation,
+ long timeout,
+ final int keys
+ ) {
+ return GridTestUtils.runAsync(() -> {
+ while (!stopFlag.get()) {
+ TreeMap<Integer, String> vals = generateValues(keys);
+
+ try {
+ try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation,
+ timeout, keys)) {
+
+ IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
+
+ // Put or remove.
+ if (ThreadLocalRandom.current().nextDouble(1) < 0.65)
+ cache.putAll(vals);
+ else
+ cache.removeAll(vals.keySet());
+
+ tx.commit();
}
- catch (Exception e){
- U.error(log(), "Failed unlock.", e);
+ catch (Exception e) {
+ U.error(log(), "Failed cache operation.", e);
}
+
+ U.sleep(100);
+ }
+ catch (Exception e) {
+ U.error(log(), "Failed unlock.", e);
}
}
- };
-
- t.start();
- return t;
+ return null;
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index 177444d..61e39ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -78,7 +79,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
*/
public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
/** */
- private static final long DURATION = 60 * 1000L;
+ private static final long DURATION = SF.apply(60 * 1000);
/** */
private static final long TX_MIN_TIMEOUT = 1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index dad5344..ee25b7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -2024,7 +2024,7 @@ public final class GridTestUtils {
/** */
public static int apply(int val) {
- return (int) (TEST_SCALE_FACTOR_VALUE * val);
+ return (int) Math.round(TEST_SCALE_FACTOR_VALUE * val);
}
/** */
@@ -2034,12 +2034,12 @@ public final class GridTestUtils {
/** Apply scale factor with lower bound */
public static int applyLB(int val, int lowerBound) {
- return Math.max((int) (TEST_SCALE_FACTOR_VALUE * val), lowerBound);
+ return Math.max(apply(val), lowerBound);
}
/** Apply scale factor with upper bound */
public static int applyUB(int val, int upperBound) {
- return Math.min((int) (TEST_SCALE_FACTOR_VALUE * val), upperBound);
+ return Math.min(apply(val), upperBound);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 7bb476f..03cfb9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMulti
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
@@ -64,7 +63,6 @@ public class IgniteCacheTestSuite6 extends TestSuite {
suite.addTestSuite(GridCachePartitionEvictionDuringReadThroughSelfTest.class);
suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class);
- suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class);
suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class);
suite.addTestSuite(CacheExchangeMergeTest.class);