You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/17 15:50:38 UTC

ignite git commit: IGNITE-8033 Fixed flaky failure of TxOptimisticDeadlockDetectionCrossCacheTest

Repository: ignite
Updated Branches:
  refs/heads/master 2edcb22fb -> 7b39f1355


IGNITE-8033 Fixed flaky failure of TxOptimisticDeadlockDetectionCrossCacheTest

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b39f135
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b39f135
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b39f135

Branch: refs/heads/master
Commit: 7b39f1355cf7b0d4169622cec2936184168aba99
Parents: 2edcb22
Author: Aleksey Plekhanov <Pl...@gmail.com>
Authored: Tue Apr 17 18:27:53 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Apr 17 18:50:15 2018 +0300

----------------------------------------------------------------------
 ...timisticDeadlockDetectionCrossCacheTest.java | 147 +++++++------------
 1 file changed, 50 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b39f135/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
index 5d1374c..056b093 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
@@ -18,30 +18,21 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -57,9 +48,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  *
  */
 public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest {
-    /** Nodes count. */
-    private static final int NODES_CNT = 2;
-
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -73,10 +61,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
             cfg.setDiscoverySpi(discoSpi);
         }
 
-        TcpCommunicationSpi commSpi = new TestCommunicationSpi();
-
-        cfg.setCommunicationSpi(commSpi);
-
         CacheConfiguration ccfg0 = defaultCacheConfiguration();
 
         ccfg0.setName("cache0");
@@ -96,42 +80,46 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
         return cfg;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGrids(NODES_CNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        stopAllGrids();
-    }
-
     /**
      * @throws Exception If failed.
      */
     public void testDeadlock() throws Exception {
-        // Sometimes boh transactions perform commit, so we repeat attempt.
-        while (!doTestDeadlock()) {}
+        startGrids(2);
+
+        try {
+            doTestDeadlock();
+        }
+        finally {
+            stopAllGrids();
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     private boolean doTestDeadlock() throws Exception {
-        TestCommunicationSpi.init(2);
-
-        final CyclicBarrier barrier = new CyclicBarrier(2);
-
         final AtomicInteger threadCnt = new AtomicInteger();
 
         final AtomicBoolean deadlock = new AtomicBoolean();
 
         final AtomicInteger commitCnt = new AtomicInteger();
 
+        grid(0).events().localListen(new CacheLocksListener(), EventType.EVT_CACHE_OBJECT_LOCKED);
+
+        AffinityTopologyVersion waitTopVer = new AffinityTopologyVersion(2, 1);
+
+        IgniteInternalFuture<?> exchFut = grid(0).context().cache().context().exchange().affinityReadyFuture(waitTopVer);
+
+        if (exchFut != null && !exchFut.isDone()) {
+            log.info("Waiting for topology exchange future [waitTopVer=" + waitTopVer + ", curTopVer="
+                + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']');
+
+            exchFut.get();
+        }
+
+        log.info("Finished topology exchange future [curTopVer="
+            + grid(0).context().cache().context().exchange().readyAffinityVersion() + ']');
+
         IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
             @Override public void run() {
                 int threadNum = threadCnt.getAndIncrement();
@@ -152,8 +140,6 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
 
                     cache1.put(key1, 0);
 
-                    barrier.await();
-
                     int key2 = primaryKey(cache2);
 
                     log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
@@ -171,23 +157,23 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
                         hasCause(e, TransactionDeadlockException.class)
                         ) {
                         if (deadlock.compareAndSet(false, true))
-                            U.error(log, "At least one stack trace should contain " +
-                                TransactionDeadlockException.class.getSimpleName(), e);
+                            log.info("Successfully set deadlock flag");
+                        else
+                            log.info("Deadlock flag was already set");
                     }
+                    else
+                        log.warning("Got not deadlock exception", e);
                 }
             }
         }, 2, "tx-thread");
 
         fut.get();
 
-        if (commitCnt.get() == 2)
-            return false;
+        assertFalse("Commits must fail", commitCnt.get() == 2);
 
         assertTrue(deadlock.get());
 
-        for (int i = 0; i < NODES_CNT ; i++) {
-            Ignite ignite = ignite(i);
-
+        for (Ignite ignite : G.allGrids()) {
             IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
 
             Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
@@ -199,59 +185,26 @@ public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstr
     }
 
     /**
+     * Listener for cache lock events.
      *
+     * To ensure deadlock this listener blocks transaction thread until both threads acquire first lock.
      */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** Tx count. */
-        private static volatile int TX_CNT;
-
-        /** Tx ids. */
-        private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
-
-        /**
-         * @param txCnt Tx count.
-         */
-        private static void init(int txCnt) {
-            TX_CNT = txCnt;
-            TX_IDS.clear();
-        }
+    private static class CacheLocksListener implements IgnitePredicate<Event> {
+        /** Latch. */
+        private final CountDownLatch latch = new CountDownLatch(2);
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(
-            final ClusterNode node,
-            final Message msg,
-            final IgniteInClosure<IgniteException> ackC
-        ) throws IgniteSpiException {
-            if (msg instanceof GridIoMessage) {
-                Message msg0 = ((GridIoMessage)msg).message();
-
-                if (msg0 instanceof GridNearTxPrepareRequest) {
-                    final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
-
-                    GridCacheVersion txId = req.version();
-
-                    if (TX_IDS.contains(txId)) {
-                        while (TX_IDS.size() < TX_CNT) {
-                            try {
-                                U.sleep(50);
-                            }
-                            catch (IgniteInterruptedCheckedException e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    }
-                }
-                else if (msg0 instanceof GridNearTxPrepareResponse) {
-                    GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+        @Override public boolean apply(Event evt) {
+            latch.countDown();
 
-                    GridCacheVersion txId = res.version();
-
-                    TX_IDS.add(txId);
-                }
+            try {
+                latch.await();
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
             }
 
-            super.sendMessage(node, msg, ackC);
+            return true;
         }
     }
-
 }