You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/10/13 17:42:47 UTC

[02/50] [abbrv] ignite git commit: ignite-6262 Flaky deadlock detection tests are fixed

ignite-6262 Flaky deadlock detection tests are fixed

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a1e90c6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a1e90c6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a1e90c6

Branch: refs/heads/ignite-2.1.5-p1
Commit: 4a1e90c654ce8f169960b6ef5035cdb9fa837a19
Parents: 542f2c2
Author: Vitaliy Biryukov <Bi...@gmail.com>
Authored: Tue Sep 19 13:23:45 2017 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Sep 19 13:31:51 2017 +0300

----------------------------------------------------------------------
 .../AbstractDeadlockDetectionTest.java          |  65 ++++++
 .../TxOptimisticDeadlockDetectionTest.java      |  54 +----
 .../TxPessimisticDeadlockDetectionTest.java     | 223 +++++--------------
 3 files changed, 116 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1e90c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractDeadlockDetectionTest.java
index 7381f5a..c0034f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractDeadlockDetectionTest.java
@@ -17,13 +17,25 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -35,6 +47,59 @@ import java.util.List;
  */
 public abstract class AbstractDeadlockDetectionTest extends GridCommonAbstractTest {
     /**
+     * Checks that transactions and futures are completed and entries are not locked.
+     * @param involvedKeys Involved keys.
+     */
+    protected void checkAllTransactionsCompleted(Set<Object> involvedKeys, int nodesCnt, String cacheName) {
+        boolean fail = false;
+
+        for (int i = 0; i < nodesCnt; i++) {
+            Ignite ignite = ignite(i);
+
+            int cacheId = ((IgniteCacheProxy)ignite.cache(cacheName)).context().cacheId();
+
+            GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context();
+
+            IgniteTxManager txMgr = cctx.tm();
+
+            Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
+
+            for (IgniteInternalTx tx : activeTxs) {
+                Collection<IgniteTxEntry> entries = tx.allEntries();
+
+                for (IgniteTxEntry entry : entries) {
+                    if (entry.cacheId() == cacheId) {
+                        fail = true;
+
+                        U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() +
+                            "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx);
+                    }
+                }
+            }
+
+            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+            assertTrue(futs.isEmpty());
+
+            GridCacheAdapter<Object, Integer> intCache = internalCache(i, cacheName);
+
+            GridCacheConcurrentMap map = intCache.map();
+
+            for (Object key : involvedKeys) {
+                KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key);
+
+                GridCacheMapEntry entry = map.getEntry(intCache.context(), keyCacheObj);
+
+                if (entry != null)
+                    assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
+            }
+        }
+
+        if (fail)
+            fail("Some transactions still exist");
+    }
+
+    /**
      * @param cache Cache.
      * @param cnt Keys count.
      * @param startFrom Start value for keys search.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1e90c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
index 24e7802..3414227 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -41,12 +40,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -350,53 +344,7 @@ public class TxOptimisticDeadlockDetectionTest extends AbstractDeadlockDetection
 
         assertNotNull("Failed to detect deadlock", deadlockE);
 
-        boolean fail = false;
-
-        // Check transactions, futures and entry locks state.
-        for (int i = 0; i < NODES_CNT * 2; i++) {
-            Ignite ignite = ignite(i);
-
-            int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId();
-
-            GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context();
-
-            IgniteTxManager txMgr = cctx.tm();
-
-            Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
-
-            for (IgniteInternalTx tx : activeTxs) {
-                Collection<IgniteTxEntry> entries = tx.allEntries();
-
-                for (IgniteTxEntry entry : entries) {
-                    if (entry.cacheId() == cacheId) {
-                        fail = true;
-
-                        U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() +
-                            "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx);
-                    }
-                }
-            }
-
-            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
-
-            assertTrue(futs.isEmpty());
-
-            GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME);
-
-            GridCacheConcurrentMap map = intCache.map();
-
-            for (Object key : involvedKeys) {
-                KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key);
-
-                GridCacheMapEntry entry = map.getEntry(intCache.context(), keyCacheObj);
-
-                if (entry != null)
-                    assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
-            }
-        }
-
-        if (fail)
-            fail("Some transactions still exist");
+        checkAllTransactionsCompleted(involvedKeys, NODES_CNT * 2, CACHE_NAME);
 
         // Check deadlock report
         String msg = deadlockE.getMessage();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1e90c6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
index 61f7125..82fa52c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -41,18 +39,11 @@ import org.apache.ignite.configuration.MemoryPolicyConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
@@ -69,18 +60,18 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
 /**
  * Tests deadlock detection for pessimistic transactions.
  */
-public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
+public class TxPessimisticDeadlockDetectionTest extends AbstractDeadlockDetectionTest {
     /** Cache name. */
     private static final String CACHE_NAME = "cache";
 
     /** Nodes count (actually two times more nodes will started: server + client). */
     private static final int NODES_CNT = 4;
 
-    /** No op transformer. */
-    private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer();
+    /** Ordinal start key. */
+    private static final Integer ORDINAL_START_KEY = 1;
 
-    /** Wrapping transformer. */
-    private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer();
+    /** Custom start key. */
+    private static final IncrementalTestObject CUSTOM_START_KEY = new KeyObject(1);
 
     /** Client mode flag. */
     private static boolean client;
@@ -141,8 +132,8 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
      */
     public void testDeadlocksPartitioned() throws Exception {
         for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
-            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER);
-            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), ORDINAL_START_KEY);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), CUSTOM_START_KEY);
         }
     }
 
@@ -151,8 +142,8 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
      */
     public void testDeadlocksPartitionedNear() throws Exception {
         for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
-            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER);
-            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), ORDINAL_START_KEY);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), CUSTOM_START_KEY);
         }
     }
 
@@ -161,8 +152,8 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
      */
     public void testDeadlocksReplicated() throws Exception {
         for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
-            doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER);
-            doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER);
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), ORDINAL_START_KEY);
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), CUSTOM_START_KEY);
         }
     }
 
@@ -178,8 +169,8 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
                 awaitPartitionMapExchange();
 
-                doTestDeadlock(2, true, true, false, NO_OP_TRANSFORMER);
-                doTestDeadlock(2, true, true, false, WRAPPING_TRANSFORMER);
+                doTestDeadlock(2, true, true, false, ORDINAL_START_KEY);
+                doTestDeadlock(2, true, true, false, CUSTOM_START_KEY);
             }
             finally {
                 if (cache != null)
@@ -225,21 +216,21 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object> transformer) throws Exception {
+    private void doTestDeadlocks(IgniteCache cache, Object startKey) throws Exception {
         try {
             awaitPartitionMapExchange();
 
-            doTestDeadlock(2, false, true, true, transformer);
-            doTestDeadlock(2, false, false, false, transformer);
-            doTestDeadlock(2, false, false, true, transformer);
+            doTestDeadlock(2, false, true, true, startKey);
+            doTestDeadlock(2, false, false, false, startKey);
+            doTestDeadlock(2, false, false, true, startKey);
 
-            doTestDeadlock(3, false, true, true, transformer);
-            doTestDeadlock(3, false, false, false, transformer);
-            doTestDeadlock(3, false, false, true, transformer);
+            doTestDeadlock(3, false, true, true, startKey);
+            doTestDeadlock(3, false, false, false, startKey);
+            doTestDeadlock(3, false, false, true, startKey);
 
-            doTestDeadlock(4, false, true, true, transformer);
-            doTestDeadlock(4, false, false, false, transformer);
-            doTestDeadlock(4, false, false, true, transformer);
+            doTestDeadlock(4, false, true, true, startKey);
+            doTestDeadlock(4, false, false, false, startKey);
+            doTestDeadlock(4, false, false, true, startKey);
         }
         catch (Exception e) {
             U.error(log, "Unexpected exception: ", e);
@@ -260,10 +251,10 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
         final boolean loc,
         boolean lockPrimaryFirst,
         final boolean clientTx,
-        final IgniteClosure<Integer, Object> transformer
+        final Object startKey
     ) throws Exception {
         log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst=" + lockPrimaryFirst +
-            ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName() + ']');
+            ", clientTx=" + clientTx + ", startKey=" + startKey.getClass().getName() + ']');
 
         final AtomicInteger threadCnt = new AtomicInteger();
 
@@ -271,10 +262,10 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
         final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>();
 
-        final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst);
+        final List<List<Object>> keySets = generateKeys(txCnt, startKey, loc, !lockPrimaryFirst);
 
-        final Set<Integer> involvedKeys = new GridConcurrentHashSet<>();
-        final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>();
+        final Set<Object> involvedKeys = new GridConcurrentHashSet<>();
+        final Set<Object> involvedLockedKeys = new GridConcurrentHashSet<>();
         final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>();
 
         IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@@ -285,23 +276,23 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
                 IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME);
 
-                List<Integer> keys = keySets.get(threadNum - 1);
+                List<Object> keys = keySets.get(threadNum - 1);
 
                 int txTimeout = 500 + txCnt * 100;
 
                 try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, txTimeout, 0)) {
                     involvedTxs.add(((TransactionProxyImpl)tx).tx());
 
-                    Integer key = keys.get(0);
+                    Object key = keys.get(0);
 
                     involvedKeys.add(key);
 
                     Object k;
 
                     log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
-                        ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+                        ", tx=" + tx + ", key=" + key + ']');
 
-                    cache.put(transformer.apply(key), 0);
+                    cache.put(key, 0);
 
                     involvedLockedKeys.add(key);
 
@@ -312,23 +303,23 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
                     ClusterNode primaryNode =
                         ((IgniteCacheProxy)cache).context().affinity().primaryByKey(key, NONE);
 
-                    List<Integer> primaryKeys =
-                        primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 * threadNum));
+                    List<Object> primaryKeys =
+                        primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, incrementKey(key, 100 * threadNum));
 
                     Map<Object, Integer> entries = new HashMap<>();
 
                     involvedKeys.add(key);
 
-                    entries.put(transformer.apply(key), 0);
+                    entries.put(key, 0);
 
-                    for (Integer i : primaryKeys) {
-                        involvedKeys.add(i);
+                    for (Object o : primaryKeys) {
+                        involvedKeys.add(o);
 
-                        entries.put(transformer.apply(i), 1);
+                        entries.put(o, 1);
 
-                        k = transformer.apply(i + 13);
+                        k = incrementKey(o, + 13);
 
-                        involvedKeys.add(i + 13);
+                        involvedKeys.add(k);
 
                         entries.put(k, 2);
                     }
@@ -368,55 +359,7 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
         assertNotNull(deadlockE);
 
-        boolean fail = false;
-
-        // Check transactions, futures and entry locks state.
-        for (int i = 0; i < NODES_CNT * 2; i++) {
-            Ignite ignite = ignite(i);
-
-            int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId();
-
-            GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context();
-
-            IgniteTxManager txMgr = cctx.tm();
-
-            Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
-
-            for (IgniteInternalTx tx : activeTxs) {
-                Collection<IgniteTxEntry> entries = tx.allEntries();
-
-                for (IgniteTxEntry entry : entries) {
-                    if (entry.cacheId() == cacheId) {
-                        fail = true;
-
-                        U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() +
-                            "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx);
-                    }
-                }
-            }
-
-            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
-
-            assertTrue(futs.isEmpty());
-
-            GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME);
-
-            GridCacheConcurrentMap map = intCache.map();
-
-            for (Integer key : involvedKeys) {
-                Object key0 = transformer.apply(key);
-
-                KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
-
-                GridCacheMapEntry entry = map.getEntry(intCache.context(), keyCacheObj);
-
-                if (entry != null)
-                    assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
-            }
-        }
-
-        if (fail)
-            fail("Some transactions still exist");
+        checkAllTransactionsCompleted(involvedKeys, NODES_CNT * 2, CACHE_NAME);
 
         // Check deadlock report
         String msg = deadlockE.getMessage();
@@ -425,11 +368,11 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
             assertTrue(msg.contains(
                 "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" + tx.threadId() + ']'));
 
-        for (Integer key : involvedKeys) {
+        for (Object key : involvedKeys) {
             if (involvedLockedKeys.contains(key))
-                assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME + ']'));
+                assertTrue(msg.contains("[key=" + key + ", cache=" + CACHE_NAME + ']'));
             else
-                assertFalse(msg.contains("[key=" + transformer.apply(key)));
+                assertFalse(msg.contains("[key=" + key));
         }
     }
 
@@ -437,11 +380,11 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
      * @param nodesCnt Nodes count.
      * @param loc Local cache.
      */
-    private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean reverse) throws IgniteCheckedException {
-        List<List<Integer>> keySets = new ArrayList<>();
+    private <T> List<List<T>> generateKeys(int nodesCnt, T startKey, boolean loc, boolean reverse) throws IgniteCheckedException {
+        List<List<T>> keySets = new ArrayList<>();
 
         if (loc) {
-            List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2);
+            List<T> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2, startKey);
 
             keySets.add(new ArrayList<>(keys));
 
@@ -451,10 +394,10 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
         }
         else {
             for (int i = 0; i < nodesCnt; i++) {
-                List<Integer> keys = new ArrayList<>(2);
+                List<T> keys = new ArrayList<>(2);
 
-                keys.add(primaryKey(ignite(i).cache(CACHE_NAME)));
-                keys.add(primaryKey(ignite(i == nodesCnt - 1 ? 0 : i + 1).cache(CACHE_NAME)));
+                keys.add(primaryKey(ignite(i).cache(CACHE_NAME), startKey));
+                keys.add(primaryKey(ignite(i == nodesCnt - 1 ? 0 : i + 1).cache(CACHE_NAME), startKey));
 
                 if (reverse)
                     Collections.reverse(keys);
@@ -465,70 +408,4 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
         return keySets;
     }
-
-    /**
-     *
-     */
-    private static class NoOpTransformer implements IgniteClosure<Integer, Object> {
-        /** {@inheritDoc} */
-        @Override public Object apply(Integer val) {
-            return val;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class WrappingTransformer implements IgniteClosure<Integer, Object> {
-        /** {@inheritDoc} */
-        @Override public Object apply(Integer val) {
-            return new KeyObject(val);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class KeyObject implements Serializable {
-        /** Id. */
-        private int id;
-
-        /** Name. */
-        private String name;
-
-        /**
-         * @param id Id.
-         */
-        public KeyObject(int id) {
-            this.id = id;
-            this.name = "KeyObject" + id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "KeyObject{" +
-                "id=" + id +
-                ", name='" + name + '\'' +
-                '}';
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            KeyObject obj = (KeyObject)o;
-
-            return id == obj.id && name.equals(obj.name);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return id;
-        }
-    }
 }