You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 12:54:47 UTC

ignite git commit: IGNITE-10080 Optimized Cache 6 long-running tests - Fixes #5243.

Repository: ignite
Updated Branches:
  refs/heads/master fe8c8cc58 -> 0e3404281


IGNITE-10080 Optimized Cache 6 long-running tests - Fixes #5243.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e340428
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e340428
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e340428

Branch: refs/heads/master
Commit: 0e3404281666668741fb8600d6e9021cccabe6d3
Parents: fe8c8cc
Author: pereslegin-pa <xx...@gmail.com>
Authored: Wed Nov 28 15:51:15 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 28 15:51:15 2018 +0300

----------------------------------------------------------------------
 .../distributed/CacheExchangeMergeTest.java     |   4 +-
 ...ptimisticTxSuspendResumeMultiServerTest.java |  30 ---
 .../IgniteOptimisticTxSuspendResumeTest.java    | 228 +++++++++++++------
 ...OptimisticPrepareOnUnstableTopologyTest.java | 164 ++++++-------
 .../transactions/TxRollbackOnTimeoutTest.java   |   3 +-
 .../ignite/testframework/GridTestUtils.java     |   6 +-
 .../testsuites/IgniteCacheTestSuite6.java       |   2 -
 7 files changed, 238 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 03ea539..2dad0b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -419,7 +419,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void concurrentStart(final boolean withClients) throws Exception {
-        for (int i = 0; i < 5; i++) {
+        int iterations = GridTestUtils.SF.applyLB(5, 1);
+
+        for (int i = 0; i < iterations; i++) {
             log.info("Iteration: " + i);
 
             startGrid(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
deleted file mode 100644
index b7003d4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-/**
- *
- */
-public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest {
-    /**
-     * @return Number of server nodes.
-     */
-    @Override protected int serversNumber() {
-        return 4;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
index 66c204d..73a7dc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -19,17 +19,21 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -62,7 +66,13 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     private static final int FUT_TIMEOUT = 5000;
 
     /** */
-    private boolean client = false;
+    private static final int CLIENT_CNT = 2;
+
+    /** */
+    private static final int SERVER_CNT = 4;
+
+    /** */
+    private static final int GRID_CNT = CLIENT_CNT + SERVER_CNT;
 
     /**
      * List of closures to execute transaction operation that prohibited in suspended state.
@@ -109,6 +119,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        int idx = getTestIgniteInstanceIndex(igniteInstanceName);
+
+        boolean client = idx >= SERVER_CNT && idx < GRID_CNT;
+
         cfg.setClientMode(client);
 
         return cfg;
@@ -118,16 +132,21 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGrids(serversNumber());
+        startGridsMultiThreaded(gridCount());
+    }
 
-        if (serversNumber() > 1) {
-            client = true;
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
 
-            startGrid(serversNumber());
+        Ignite client = ignite(gridCount() - 1);
 
-            startGrid(serversNumber() + 1);
+        assertTrue(client.cluster().localNode().isClient());
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            grid(0).createCache(ccfg);
 
-            client = false;
+            client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
         }
 
         awaitPartitionMapExchange();
@@ -138,11 +157,19 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
         stopAllGrids(true);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        for (CacheConfiguration ccfg : cacheConfigurations())
+            ignite(0).destroyCache(ccfg.getName());
+
+        super.afterTest();
+    }
+
     /**
      * @return Number of server nodes.
      */
-    protected int serversNumber() {
-        return 1;
+    protected int gridCount() {
+        return GRID_CNT;
     }
 
     /**
@@ -215,8 +242,8 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
         executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
             @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
                 for (TransactionIsolation isolation : TransactionIsolation.values()) {
-                    final IgniteCache<Integer, Integer> otherCache =
-                        ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache"));
+                    final IgniteCache<Integer, Integer> otherCache = ignite.getOrCreateCache(
+                        cacheConfiguration("otherCache", PARTITIONED, 0, false));
 
                     final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
 
@@ -435,10 +462,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
 
                     tx.suspend();
 
-                    long start = U.currentTimeMillis();
-
-                    while(TX_TIMEOUT >= U.currentTimeMillis() - start)
-                        Thread.sleep(TX_TIMEOUT * 2);
+                    U.sleep(TX_TIMEOUT * 2);
 
                     GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
                         @Override public Object call() throws Exception {
@@ -475,10 +499,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
 
                     cache.put(1, 1);
 
-                    long start = U.currentTimeMillis();
-
-                    while(TX_TIMEOUT >= U.currentTimeMillis() - start)
-                        Thread.sleep(TX_TIMEOUT * 2);
+                    U.sleep(TX_TIMEOUT * 2);
 
                     GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
                         @Override public Object call() throws Exception {
@@ -599,33 +620,92 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
      * @throws Exception If failed.
      */
     public void testSuspendTxAndResumeAfterTopologyChange() throws Exception {
-        executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
-            @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+        Ignite srv = ignite(ThreadLocalRandom.current().nextInt(SERVER_CNT));
+        Ignite client = ignite(SERVER_CNT);
+        Ignite clientNear = ignite(SERVER_CNT + 1);
+
+        Map<String, List<List<Integer>>> cacheKeys = generateKeys(srv, TransactionIsolation.values().length);
+
+        doCheckSuspendTxAndResume(srv, cacheKeys);
+        doCheckSuspendTxAndResume(client, cacheKeys);
+        doCheckSuspendTxAndResume(clientNear, cacheKeys);
+    }
+
+    /**
+     * @param node Ignite isntance.
+     * @param cacheKeys Different key types mapped to cache name.
+     * @throws Exception If failed.
+     */
+    private void doCheckSuspendTxAndResume(Ignite node, Map<String, List<List<Integer>>> cacheKeys) throws Exception {
+        ClusterNode locNode = node.cluster().localNode();
+
+        log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']');
+
+        Map<IgniteCache<Integer, Integer>, Map<Transaction, Integer>> cacheTxMap = new IdentityHashMap<>();
+
+        for (Map.Entry<String, List<List<Integer>>> cacheKeysEntry : cacheKeys.entrySet()) {
+            String cacheName = cacheKeysEntry.getKey();
+
+            IgniteCache<Integer, Integer> cache = node.cache(cacheName);
+
+            Map<Transaction, Integer> suspendedTxs = new IdentityHashMap<>();
+
+            for (List<Integer> keysList : cacheKeysEntry.getValue()) {
                 for (TransactionIsolation isolation : TransactionIsolation.values()) {
-                    Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+                    Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation);
 
-                    cache.put(1, 1);
+                    int key = keysList.get(isolation.ordinal());
+
+                    cache.put(key, key);
 
                     tx.suspend();
 
-                    assertEquals(SUSPENDED, tx.state());
+                    suspendedTxs.put(tx, key);
 
-                    try (IgniteEx g = startGrid(serversNumber() + 3)) {
-                        tx.resume();
+                    String msg = "node=" + node.cluster().localNode() +
+                        ", cache=" + cacheName + ", isolation=" + isolation + ", key=" + key;
 
-                        assertEquals(ACTIVE, tx.state());
+                    assertEquals(msg, SUSPENDED, tx.state());
+                }
+            }
 
-                        assertEquals(1, (int)cache.get(1));
+            cacheTxMap.put(cache, suspendedTxs);
+        }
 
-                        tx.commit();
+        int newNodeIdx = gridCount();
 
-                        assertEquals(1, (int)cache.get(1));
-                    }
+        startGrid(newNodeIdx);
 
-                    cache.removeAll();
+        try {
+            for (Map.Entry<IgniteCache<Integer, Integer>, Map<Transaction, Integer>>  entry : cacheTxMap.entrySet()) {
+                IgniteCache<Integer, Integer> cache = entry.getKey();
+
+                for (Map.Entry<Transaction, Integer> suspendedTx : entry.getValue().entrySet()) {
+                    Transaction tx = suspendedTx.getKey();
+
+                    Integer key = suspendedTx.getValue();
+
+                    tx.resume();
+
+                    String msg = "node=" + node.cluster().localNode() +
+                        ", cache=" + cache.getName() + ", isolation=" + tx.isolation() + ", key=" + key;
+
+                    assertEquals(msg, ACTIVE, tx.state());
+
+                    assertEquals(msg, key, cache.get(key));
+
+                    tx.commit();
+
+                    assertEquals(msg, key, cache.get(key));
                 }
             }
-        });
+        }
+        finally {
+            stopGrid(newNodeIdx);
+
+            for (IgniteCache<Integer, Integer> cache : cacheTxMap.keySet())
+                cache.removeAll();
+        }
     }
 
     /**
@@ -666,10 +746,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
         List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
 
-        cfgs.add(cacheConfiguration(PARTITIONED, 0, false));
-        cfgs.add(cacheConfiguration(PARTITIONED, 1, false));
-        cfgs.add(cacheConfiguration(PARTITIONED, 1, true));
-        cfgs.add(cacheConfiguration(REPLICATED, 0, false));
+        cfgs.add(cacheConfiguration("cache1", PARTITIONED, 0, false));
+        cfgs.add(cacheConfiguration("cache2", PARTITIONED, 1, false));
+        cfgs.add(cacheConfiguration("cache3", PARTITIONED, 1, true));
+        cfgs.add(cacheConfiguration("cache4", REPLICATED, 0, false));
 
         return cfgs;
     }
@@ -681,10 +761,11 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
      * @return Cache configuration.
      */
     private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        String name,
         CacheMode cacheMode,
         int backups,
         boolean nearCache) {
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(name);
 
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(TRANSACTIONAL);
@@ -701,37 +782,56 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
 
     /**
      * @param c Closure.
-     * @throws Exception If failed.
      */
-    private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
-        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
-            ignite(0).createCache(ccfg);
+    private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) {
+        for (int i = 0; i < gridCount(); i++) {
+            Ignite ignite = ignite(i);
 
-            log.info("Run test for cache [cache=" + ccfg.getCacheMode() +
-                ", backups=" + ccfg.getBackups() +
-                ", near=" + (ccfg.getNearConfiguration() != null) + "]");
+            ClusterNode locNode = ignite.cluster().localNode();
 
-            awaitPartitionMapExchange();
+            log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']');
 
-            int srvNum = serversNumber();
-            if (serversNumber() > 1) {
-                ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
-                srvNum += 2;
-            }
+            for (CacheConfiguration ccfg : cacheConfigurations())
+                c.apply(ignite, ignite.cache(ccfg.getName()));
+        }
+    }
 
-            try {
-                for (int i = 0; i < srvNum; i++) {
-                    Ignite ignite = ignite(i);
+    /**
+     * Generates list of keys (primary, backup and neither primary nor backup).
+     *
+     * @param ignite Ignite instance.
+     * @param keysCnt The number of keys generated for each type of key.
+     * @return List of different keys mapped to cache name.
+     */
+    private Map<String, List<List<Integer>>> generateKeys(Ignite ignite, int keysCnt) {
+        Map<String, List<List<Integer>>> cacheKeys = new HashMap<>();
 
-                    log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']');
+        for (CacheConfiguration cfg : cacheConfigurations()) {
+            String cacheName = cfg.getName();
 
-                    c.apply(ignite, ignite.<Integer, Integer>cache(ccfg.getName()));
-                }
-            }
-            finally {
-                ignite(0).destroyCache(ccfg.getName());
+            IgniteCache cache = ignite.cache(cacheName);
+
+            List<List<Integer>> keys = new ArrayList<>();
+
+            // Generate different keys: 0 - primary, 1 - backup, 2 - neither primary nor backup.
+            for (int type = 0; type < 3; type++) {
+                if (type == 1 && cfg.getCacheMode() == PARTITIONED && cfg.getBackups() == 0)
+                    continue;
+
+                if (type == 2 && cfg.getCacheMode() == REPLICATED)
+                    continue;
+
+                List<Integer> keys0 = findKeys(cache, keysCnt, type * 100_000, type);
+
+                assertEquals(cacheName, keysCnt, keys0.size());
+
+                keys.add(keys0);
             }
+
+            cacheKeys.put(cacheName, keys);
         }
+
+        return cacheKeys;
     }
 
     /**
@@ -750,7 +850,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
          */
         public abstract void applyx(E1 e1, E2 e2) throws Exception;
 
-        /** {@inheritdoc} */
+        /** {@inheritDoc} */
         @Override public void apply(E1 e1, E2 e2) {
             try {
                 applyx(e1, e2);
@@ -775,7 +875,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
          */
         public abstract void applyx(T o) throws Exception;
 
-        /** {@inheritdoc} */
+        /** {@inheritDoc} */
         @Override public void apply(T o) {
             try {
                 applyx(o);
@@ -797,7 +897,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
          */
         public abstract void runx() throws Exception;
 
-        /** {@inheritdoc} */
+        /** {@inheritDoc} */
         @Override public void run() {
             try {
                 runx();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
index 21dcf90..cbdcffe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java
@@ -17,22 +17,23 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -49,21 +50,17 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
     /** */
     public static final String CACHE_NAME = "part_cache";
 
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
     /** */
-    private volatile boolean run = true;
+    private static final int STARTUP_DELAY = 500;
 
     /** */
-    private boolean client;
+    private static final int GRID_CNT = 4;
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-        assertEquals(0, G.allGrids().size());
-    }
+    /** */
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -110,58 +107,42 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
      */
     private void doPrepareOnUnstableTopology(int keys, boolean testClient, TransactionIsolation isolation,
         long timeout) throws Exception {
-        Collection<Thread> threads = new ArrayList<>();
-
-        try {
-            // Start grid 1.
-            IgniteEx grid1 = startGrid(0);
-
-            assertFalse(grid1.configuration().isClientMode());
-
-            threads.add(runCacheOperations(grid1, isolation, timeout, keys));
-
-            TimeUnit.SECONDS.sleep(3L);
-
-            client = testClient; // If test client start on node in client mode.
-
-            // Start grid 2.
-            IgniteEx grid2 = startGrid(1);
-
-            assertEquals((Object)testClient, grid2.configuration().isClientMode());
+        GridCompoundFuture<Void, Object> compFut = new GridCompoundFuture<>();
 
-            client = false;
+        AtomicBoolean stopFlag = new AtomicBoolean();
 
-            threads.add(runCacheOperations(grid2, isolation, timeout, keys));
-
-            TimeUnit.SECONDS.sleep(3L);
-
-            // Start grid 3.
-            IgniteEx grid3 = startGrid(2);
+        try {
+            int clientIdx = testClient ? 1 : -1;
 
-            assertFalse(grid3.configuration().isClientMode());
+            try {
+                for (int i = 0; i < GRID_CNT; i++) {
+                    client = (clientIdx == i);
 
-            if (testClient)
-                log.info("Started client node: " + grid3.name());
+                    IgniteEx grid = startGrid(i);
 
-            threads.add(runCacheOperations(grid3, isolation, timeout, keys));
+                    assertEquals(client, grid.configuration().isClientMode().booleanValue());
 
-            TimeUnit.SECONDS.sleep(3L);
+                    client = false;
 
-            // Start grid 4.
-            IgniteEx grid4 = startGrid(3);
+                    IgniteInternalFuture<Void> fut = runCacheOperationsAsync(grid, stopFlag, isolation, timeout, keys);
 
-            assertFalse(grid4.configuration().isClientMode());
+                    compFut.add(fut);
 
-            threads.add(runCacheOperations(grid4, isolation, timeout, keys));
+                    U.sleep(STARTUP_DELAY);
+                }
+            }
+            finally {
+                stopFlag.set(true);
+            }
 
-            TimeUnit.SECONDS.sleep(3L);
+            compFut.markInitialized();
 
-            stopThreads(threads);
+            compFut.get();
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < GRID_CNT; i++) {
                 IgniteTxManager tm = ((IgniteKernal)grid(i)).internalCache(CACHE_NAME).context().tm();
 
-                assertEquals("txMap is not empty:" + i, 0, tm.idMapSize());
+                assertEquals("txMap is not empty: " + i, 0, tm.idMapSize());
             }
         }
         finally {
@@ -170,63 +151,50 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac
     }
 
     /**
-     * @param threads Thread which will be stopped.
-     */
-    private void stopThreads(Iterable<Thread> threads) {
-        try {
-            run = false;
-
-            for (Thread thread : threads)
-                thread.join();
-        }
-        catch (Exception e) {
-            U.error(log(), "Couldn't stop threads.", e);
-        }
-    }
-
-    /**
      * @param node Node.
      * @param isolation Isolation.
      * @param timeout Timeout.
      * @param keys Number of keys.
-     * @return Running thread.
+     * @return Future representing pending completion of the operation.
      */
-    private Thread runCacheOperations(Ignite node, TransactionIsolation isolation, long timeout, final int keys) {
-        Thread t = new Thread() {
-            @Override public void run() {
-                while (run) {
-                    TreeMap<Integer, String> vals = generateValues(keys);
-
-                    try {
-                        try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation,
-                            timeout, keys)){
-
-                            IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
-
-                            // Put or remove.
-                            if (ThreadLocalRandom.current().nextDouble(1) < 0.65)
-                                cache.putAll(vals);
-                            else
-                                cache.removeAll(vals.keySet());
-
-                            tx.commit();
-                        }
-                        catch (Exception e) {
-                            U.error(log(), "Failed cache operation.", e);
-                        }
-
-                        U.sleep(100);
+    private IgniteInternalFuture<Void> runCacheOperationsAsync(
+        Ignite node,
+        AtomicBoolean stopFlag,
+        TransactionIsolation isolation,
+        long timeout,
+        final int keys
+    ) {
+        return GridTestUtils.runAsync(() -> {
+            while (!stopFlag.get()) {
+                TreeMap<Integer, String> vals = generateValues(keys);
+
+                try {
+                    try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation,
+                        timeout, keys)) {
+
+                        IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
+
+                        // Put or remove.
+                        if (ThreadLocalRandom.current().nextDouble(1) < 0.65)
+                            cache.putAll(vals);
+                        else
+                            cache.removeAll(vals.keySet());
+
+                        tx.commit();
                     }
-                    catch (Exception e){
-                        U.error(log(), "Failed unlock.", e);
+                    catch (Exception e) {
+                        U.error(log(), "Failed cache operation.", e);
                     }
+
+                    U.sleep(100);
+                }
+                catch (Exception e) {
+                    U.error(log(), "Failed unlock.", e);
                 }
             }
-        };
-
-        t.start();
 
-        return t;
+            return null;
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index 177444d..61e39ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.SF;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -78,7 +79,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
  */
 public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
     /** */
-    private static final long DURATION = 60 * 1000L;
+    private static final long DURATION = SF.apply(60 * 1000);
 
     /** */
     private static final long TX_MIN_TIMEOUT = 1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index dad5344..ee25b7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -2024,7 +2024,7 @@ public final class GridTestUtils {
 
         /** */
         public static int apply(int val) {
-            return (int) (TEST_SCALE_FACTOR_VALUE * val);
+            return (int) Math.round(TEST_SCALE_FACTOR_VALUE * val);
         }
 
         /** */
@@ -2034,12 +2034,12 @@ public final class GridTestUtils {
 
         /** Apply scale factor with lower bound */
         public static int applyLB(int val, int lowerBound) {
-            return Math.max((int) (TEST_SCALE_FACTOR_VALUE * val), lowerBound);
+            return Math.max(apply(val), lowerBound);
         }
 
         /** Apply scale factor with upper bound */
         public static int applyUB(int val, int upperBound) {
-            return Math.min((int) (TEST_SCALE_FACTOR_VALUE * val), upperBound);
+            return Math.min(apply(val), upperBound);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 7bb476f..03cfb9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMulti
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
@@ -64,7 +63,6 @@ public class IgniteCacheTestSuite6 extends TestSuite {
 
         suite.addTestSuite(GridCachePartitionEvictionDuringReadThroughSelfTest.class);
         suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class);
-        suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class);
         suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class);
 
         suite.addTestSuite(CacheExchangeMergeTest.class);