You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/07/15 18:28:48 UTC

[ignite] branch master updated: IGNITE-13251 Fixed deadlock between grid-timeout-worker and a thread opening a communication connection. Fixes #8033

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b8e9cc  IGNITE-13251 Fixed deadlock between grid-timeout-worker and a thread opening a communication connection. Fixes #8033
9b8e9cc is described below

commit 9b8e9cc384ab92d9811e158344516c927ddcaebc
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Wed Jul 15 21:28:08 2020 +0300

    IGNITE-13251 Fixed deadlock between grid-timeout-worker and a thread opening a communication connection. Fixes #8033
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/GridCachePartitionExchangeManager.java   | 43 +++++++++++------
 .../cache/transactions/IgniteTxManager.java        |  3 +-
 .../managers/IgniteDiagnosticMessagesTest.java     | 56 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 16 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d8f95db..d569151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -290,6 +291,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Metric that shows whether cluster is in fully rebalanced state. */
     private volatile BooleanMetricImpl rebalanced;
 
+    /** */
+    private final ReentrantLock dumpLongRunningOpsLock = new ReentrantLock();
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -2376,29 +2380,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (lastFut != null && !lastFut.isDone())
                 return;
 
-            if (U.currentTimeMillis() < nextLongRunningOpsDumpTime)
+            if (!dumpLongRunningOpsLock.tryLock())
                 return;
 
-            if (dumpLongRunningOperations0(timeout)) {
-                nextLongRunningOpsDumpTime = U.currentTimeMillis() + nextDumpTimeout(longRunningOpsDumpStep++, timeout);
+            try {
+                if (U.currentTimeMillis() < nextLongRunningOpsDumpTime)
+                    return;
+
+                if (dumpLongRunningOperations0(timeout)) {
+                    nextLongRunningOpsDumpTime = U.currentTimeMillis() +
+                        nextDumpTimeout(longRunningOpsDumpStep++, timeout);
 
-                if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) {
-                    U.warn(diagnosticLog, "Found long running cache operations, dump threads.");
+                    if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) {
+                        U.warn(diagnosticLog, "Found long running cache operations, dump threads.");
 
-                    U.dumpThreads(diagnosticLog);
-                }
+                        U.dumpThreads(diagnosticLog);
+                    }
 
-                if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false)) {
-                    U.warn(diagnosticLog, "Found long running cache operations, dump IO statistics.");
+                    if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false)) {
+                        U.warn(diagnosticLog, "Found long running cache operations, dump IO statistics.");
 
-                    // Dump IO manager statistics.
-                    if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT, false))
-                        cctx.gridIO().dumpStats();
+                        // Dump IO manager statistics.
+                        if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT, false))
+                            cctx.gridIO().dumpStats();
+                    }
+                }
+                else {
+                    nextLongRunningOpsDumpTime = 0;
+                    longRunningOpsDumpStep = 0;
                 }
             }
-            else {
-                nextLongRunningOpsDumpTime = 0;
-                longRunningOpsDumpStep = 0;
+            finally {
+                dumpLongRunningOpsLock.unlock();
             }
         }
         catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 459b2f5..df31df9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2153,7 +2153,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         scheduleDumpTask(
             IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT,
-            () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout),
+            () -> cctx.kernalContext().closure().runLocalSafe(
+                () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout)),
             longOpsDumpTimeout);
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
index 54e0836..df6dcf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -22,6 +22,8 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
@@ -67,6 +69,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 
 /**
  *
@@ -423,6 +426,59 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Ensure that dumpLongRunningTransaction doesn't block scheduler.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDumpLongRunningOperationDoesntBlockTimeoutWorker() throws Exception {
+        long longOpsDumpTimeout = 100;
+
+        IgniteEx ignite = startGrid(0);
+
+        IgniteCache cache = ignite.createCache(new CacheConfiguration<>("txCache").
+            setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        ignite.transactions().txStart(PESSIMISTIC, SERIALIZABLE);
+
+        cache.put(1, 1);
+
+        // Wait for some time for transaction to be considered as long running.
+        Thread.sleep(longOpsDumpTimeout * 2);
+
+        // That will allow to block dumpLongRunningTransaction on line
+        // {@code ClusterGroup nearNode = ignite.cluster().forNodeId(nearNodeId);}
+        ignite.context().gateway().writeLock();
+
+        try {
+            ignite.context().cache().context().tm().longOperationsDumpTimeout(100);
+
+            // Wait for some time to guarantee start dumping long running transaction.
+            Thread.sleep(longOpsDumpTimeout * 2);
+
+            AtomicBoolean schedulerAssertionFlag = new AtomicBoolean(false);
+
+            CountDownLatch scheduleLatch = new CountDownLatch(1);
+
+            ignite.context().timeout().schedule(
+                () -> {
+                    schedulerAssertionFlag.set(true);
+                    scheduleLatch.countDown();
+                },
+                0,
+                -1);
+
+            scheduleLatch.await(5_000, TimeUnit.MILLISECONDS);
+
+            // Ensure that dumpLongRunning transaction doesn't block scheduler.
+            assertTrue(schedulerAssertionFlag.get());
+        }
+        finally {
+            ignite.context().gateway().writeUnlock();
+        }
+    }
+
+    /**
      * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */