You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/17 15:50:38 UTC
ignite git commit: IGNITE-8033 Fixed flaky failure of
TxOptimisticDeadlockDetectionCrossCacheTest
Repository: ignite
Updated Branches:
refs/heads/master 2edcb22fb -> 7b39f1355
IGNITE-8033 Fixed flaky failure of TxOptimisticDeadlockDetectionCrossCacheTest
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b39f135
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b39f135
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b39f135
Branch: refs/heads/master
Commit: 7b39f1355cf7b0d4169622cec2936184168aba99
Parents: 2edcb22
Author: Aleksey Plekhanov <Pl...@gmail.com>
Authored: Tue Apr 17 18:27:53 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Apr 17 18:50:15 2018 +0300
----------------------------------------------------------------------
...timisticDeadlockDetectionCrossCacheTest.java | 147 +++++++------------
1 file changed, 50 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b39f135/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
index 5d1374c..056b093 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
@@ -18,30 +18,21 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -57,9 +48,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*
*/
public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest {
- /** Nodes count. */
- private static final int NODES_CNT = 2;
-
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -73,10 +61,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
cfg.setDiscoverySpi(discoSpi);
}
- TcpCommunicationSpi commSpi = new TestCommunicationSpi();
-
- cfg.setCommunicationSpi(commSpi);
-
CacheConfiguration ccfg0 = defaultCacheConfiguration();
ccfg0.setName("cache0");
@@ -96,42 +80,46 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
return cfg;
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(NODES_CNT);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- stopAllGrids();
- }
-
/**
* @throws Exception If failed.
*/
public void testDeadlock() throws Exception {
- // Sometimes boh transactions perform commit, so we repeat attempt.
- while (!doTestDeadlock()) {}
+ startGrids(2);
+
+ try {
+ doTestDeadlock();
+ }
+ finally {
+ stopAllGrids();
+ }
}
/**
* @throws Exception If failed.
*/
private boolean doTestDeadlock() throws Exception {
- TestCommunicationSpi.init(2);
-
- final CyclicBarrier barrier = new CyclicBarrier(2);
-
final AtomicInteger threadCnt = new AtomicInteger();
final AtomicBoolean deadlock = new AtomicBoolean();
final AtomicInteger commitCnt = new AtomicInteger();
+ grid(0).events().localListen(new CacheLocksListener(), EventType.EVT_CACHE_OBJECT_LOCKED);
+
+ AffinityTopologyVersion waitTopVer = new AffinityTopologyVersion(2, 1);
+
+ IgniteInternalFuture<?> exchFut = grid(0).context().cache().context().exchange().affinityReadyFuture(waitTopVer);
+
+ if (exchFut != null && !exchFut.isDone()) {
+ log.info("Waiting for topology exchange future [waitTopVer=" + waitTopVer + ", curTopVer="
+ + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']');
+
+ exchFut.get();
+ }
+
+ log.info("Finished topology exchange future [curTopVer="
+ + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']');
+
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int threadNum = threadCnt.getAndIncrement();
@@ -152,8 +140,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
cache1.put(key1, 0);
- barrier.await();
-
int key2 = primaryKey(cache2);
log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
@@ -171,23 +157,23 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
hasCause(e, TransactionDeadlockException.class)
) {
if (deadlock.compareAndSet(false, true))
- U.error(log, "At least one stack trace should contain " +
- TransactionDeadlockException.class.getSimpleName(), e);
+ log.info("Successfully set deadlock flag");
+ else
+ log.info("Deadlock flag was already set");
}
+ else
+ log.warning("Got not deadlock exception", e);
}
}
}, 2, "tx-thread");
fut.get();
- if (commitCnt.get() == 2)
- return false;
+ assertFalse("Commits must fail", commitCnt.get() == 2);
assertTrue(deadlock.get());
- for (int i = 0; i < NODES_CNT ; i++) {
- Ignite ignite = ignite(i);
-
+ for (Ignite ignite : G.allGrids()) {
IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
@@ -199,59 +185,26 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
}
/**
+ * Listener for cache lock events.
*
+ * To ensure deadlock this listener blocks transaction thread until both threads acquire first lock.
*/
- private static class TestCommunicationSpi extends TcpCommunicationSpi {
- /** Tx count. */
- private static volatile int TX_CNT;
-
- /** Tx ids. */
- private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
-
- /**
- * @param txCnt Tx count.
- */
- private static void init(int txCnt) {
- TX_CNT = txCnt;
- TX_IDS.clear();
- }
+ private static class CacheLocksListener implements IgnitePredicate<Event> {
+ /** Latch. */
+ private final CountDownLatch latch = new CountDownLatch(2);
/** {@inheritDoc} */
- @Override public void sendMessage(
- final ClusterNode node,
- final Message msg,
- final IgniteInClosure<IgniteException> ackC
- ) throws IgniteSpiException {
- if (msg instanceof GridIoMessage) {
- Message msg0 = ((GridIoMessage)msg).message();
-
- if (msg0 instanceof GridNearTxPrepareRequest) {
- final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
-
- GridCacheVersion txId = req.version();
-
- if (TX_IDS.contains(txId)) {
- while (TX_IDS.size() < TX_CNT) {
- try {
- U.sleep(50);
- }
- catch (IgniteInterruptedCheckedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- else if (msg0 instanceof GridNearTxPrepareResponse) {
- GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+ @Override public boolean apply(Event evt) {
+ latch.countDown();
- GridCacheVersion txId = res.version();
-
- TX_IDS.add(txId);
- }
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
}
- super.sendMessage(node, msg, ackC);
+ return true;
}
}
-
}