You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/12/17 13:09:46 UTC

[ignite] branch master updated: IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a09d128  IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124.
a09d128 is described below

commit a09d1284a8f5cbaae7a8548af6c32ca422810035
Author: zstan <st...@gmail.com>
AuthorDate: Tue Dec 17 16:09:21 2019 +0300

    IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../GridCacheDatabaseSharedManager.java            |  11 +-
 .../apache/ignite/internal/util/IgniteUtils.java   | 188 +++++++++++----------
 .../persistence/CheckpointReadLockFailureTest.java | 144 +++++++++++++++-
 3 files changed, 250 insertions(+), 93 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 2eeb330..a89919b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -226,6 +226,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP";
 
+    /** Log read lock holders. */
+    public static final String IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS = "IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS";
+
     /** MemoryPolicyConfiguration name reserved for meta store. */
     public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
 
@@ -242,6 +245,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
 
+    /** */
+    private final boolean logReadLockHolders = getBoolean(IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS);
+
     /**
      * Starting from this number of dirty pages in checkpoint, array will be sorted with
      * {@link Arrays#parallelSort(Comparable[])} in case of {@link CheckpointWriteOrder#SEQUENTIAL}.
@@ -297,7 +303,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** For testing only. */
     private volatile GridFutureAdapter<Void> enableChangeApplied;
 
-    /** */
+    /** Checkpont lock. */
     ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
 
     /** */
@@ -544,6 +550,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         final GridKernalContext kernalCtx = cctx.kernalContext();
 
+        if (logReadLockHolders)
+            checkpointLock = new U.ReentrantReadWriteLockTracer(checkpointLock, kernalCtx, 5_000);
+
         if (!kernalCtx.clientNode()) {
             kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 34ef7be..1b51d45 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -136,9 +136,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.function.Consumer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.jar.JarFile;
 import java.util.logging.ConsoleHandler;
 import java.util.logging.Handler;
@@ -234,6 +234,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -339,6 +340,9 @@ public abstract class IgniteUtils {
     /** Default user version. */
     public static final String DFLT_USER_VERSION = "0";
 
+    /** Lock hold message. */
+    public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than ";
+
     /** Cache for {@link GridPeerDeployAware} fields to speed up reflection. */
     private static final ConcurrentMap<String, IgniteBiTuple<Class<?>, Collection<Field>>> p2pFields =
         new ConcurrentHashMap<>();
@@ -10824,20 +10828,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * @param lock Lock.
-     */
-    public static ReentrantReadWriteLockTracer lockTracer(ReadWriteLock lock) {
-        return new ReentrantReadWriteLockTracer(lock);
-    }
-
-    /**
-     * @param lock Lock.
-     */
-    public static LockTracer lockTracer(Lock lock) {
-        return new LockTracer(lock);
-    }
-
-    /**
      * Puts additional text to thread name.
      * Calls {@code enhanceThreadName(Thread.currentThread(), text)}.
      * For details see {@link #enhanceThreadName(Thread, String)}.
@@ -11388,102 +11378,125 @@ public abstract class IgniteUtils {
         };
     }
 
-    /**
-     *
-     */
-    public static class ReentrantReadWriteLockTracer implements ReadWriteLock {
+    /** */
+    public static class ReentrantReadWriteLockTracer extends ReentrantReadWriteLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Read lock. */
-        private final LockTracer readLock;
+        private final ReadLockTracer readLock;
 
         /** Write lock. */
-        private final LockTracer writeLock;
+        private final WriteLockTracer writeLock;
+
+        /** Lock print threshold. */
+        private long readLockThreshold;
+
+        /** */
+        private IgniteLogger log;
 
         /**
-         * @param delegate Delegate.
+         * @param delegate RWLock delegate.
+         * @param kctx Kernal context.
+         * @param readLockThreshold ReadLock threshold timeout.
+         *
          */
-        public ReentrantReadWriteLockTracer(ReadWriteLock delegate) {
-            readLock = new LockTracer(delegate.readLock());
-            writeLock = new LockTracer(delegate.writeLock());
+        public ReentrantReadWriteLockTracer(ReentrantReadWriteLock delegate, GridKernalContext kctx, long readLockThreshold) {
+            log = kctx.cache().context().logger(getClass());
+
+            readLock = new ReadLockTracer(delegate, log, readLockThreshold);
+
+            writeLock = new WriteLockTracer(delegate);
+
+            this.readLockThreshold = readLockThreshold;
         }
 
         /** {@inheritDoc} */
-        @NotNull @Override public Lock readLock() {
+        @Override public ReadLock readLock() {
             return readLock;
         }
 
         /** {@inheritDoc} */
-        @NotNull @Override public Lock writeLock() {
+        @Override public WriteLock writeLock() {
             return writeLock;
         }
 
-        /**
-         *
-         */
-        public LockTracer getReadLock() {
-            return readLock;
-        }
-
-        /**
-         *
-         */
-        public LockTracer getWriteLock() {
-            return writeLock;
+        /** */
+        public long lockWaitThreshold() {
+            return readLockThreshold;
         }
     }
 
-    /**
-     *
-     */
-    public static class LockTracer implements Lock {
+    /** */
+    private static class ReadLockTracer extends ReentrantReadWriteLock.ReadLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Delegate. */
-        private final Lock delegate;
+        private final ReentrantReadWriteLock.ReadLock delegate;
 
-        private final AtomicLong cnt = new AtomicLong();
+        /** */
+        private static final ThreadLocal<T2<Integer, Long>> READ_LOCK_HOLDER_TS =
+            ThreadLocal.withInitial(() -> new T2<>(0, 0L));
 
-        /** Count. */
-        private final ConcurrentMap<String, AtomicLong> cntMap = new ConcurrentHashMap<>();
+        /** */
+        private IgniteLogger log;
 
-        /**
-         * @param delegate Delegate.
-         */
-        public LockTracer(Lock delegate) {
-            this.delegate = delegate;
-        }
+        /** */
+        private long readLockThreshold;
 
-        /**
-         *
-         */
-        private void inc(){
-            cnt.incrementAndGet();
+        /** */
+        public ReadLockTracer(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) {
+            super(lock);
 
-            String name = Thread.currentThread().getName();
+            delegate = lock.readLock();
 
-            AtomicLong cnt = cntMap.get(name);
+            this.log = log;
 
-            if (cnt == null) {
-                AtomicLong cnt0 = cntMap.putIfAbsent(name, cnt = new AtomicLong());
+            this.readLockThreshold = readLockThreshold;
+        }
 
-                if (cnt0 != null)
-                    cnt = cnt0;
-            }
+        /** */
+        private void inc() {
+            T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get();
+
+            int cntr = val.get1();
+
+            if (cntr == 0)
+                val.set2(U.currentTimeMillis());
 
-            cnt.incrementAndGet();
+            val.set1(++cntr);
+
+            READ_LOCK_HOLDER_TS.set(val);
         }
 
-        /**
-         *
-         */
-        private void dec(){
-            cnt.decrementAndGet();
+        /** */
+        private void dec() {
+            T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get();
+
+            int cntr = val.get1();
+
+            if (--cntr == 0) {
+                long timeout = U.currentTimeMillis() - val.get2();
 
-            String name = Thread.currentThread().getName();
+                if (timeout > readLockThreshold) {
+                    GridStringBuilder sb = new GridStringBuilder();
 
-            AtomicLong cnt = cntMap.get(name);
+                    sb.a(LOCK_HOLD_MESSAGE + timeout + " ms." + nl());
 
-            cnt.decrementAndGet();
+                    U.printStackTrace(Thread.currentThread().getId(), sb);
+
+                    U.warn(log, sb.toString());
+                }
+            }
+
+            val.set1(cntr);
+
+            READ_LOCK_HOLDER_TS.set(val);
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("LockAcquiredButNotSafelyReleased")
         @Override public void lock() {
             delegate.lock();
 
@@ -11491,6 +11504,7 @@ public abstract class IgniteUtils {
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("LockAcquiredButNotSafelyReleased")
         @Override public void lockInterruptibly() throws InterruptedException {
             delegate.lockInterruptibly();
 
@@ -11525,24 +11539,16 @@ public abstract class IgniteUtils {
 
             dec();
         }
+    }
 
-        /** {@inheritDoc} */
-        @NotNull @Override public Condition newCondition() {
-            return delegate.newCondition();
-        }
-
-        /**
-         *
-         */
-        public Map<String, AtomicLong> getLockUnlockCounters() {
-            return new HashMap<>(cntMap);
-        }
-
-        /**
-         *
-         */
-        public long getLockUnlockCounter() {
-            return cnt.get();
+    /** */
+    private static class WriteLockTracer extends ReentrantReadWriteLock.WriteLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+        
+        /** */
+        public WriteLockTracer(ReentrantReadWriteLock lock) {
+            super(lock);
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
index 5f2a910..b15f6a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
@@ -21,6 +21,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -30,18 +32,29 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS;
+import static org.apache.ignite.internal.util.IgniteUtils.LOCK_HOLD_MESSAGE;
+
 /**
  * Tests critical failure handling on checkpoint read lock acquisition errors.
  */
 public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
     /** */
+    private ListeningTestLogger testLog;
+
+    /** */
     private static final AbstractFailureHandler FAILURE_HND = new AbstractFailureHandler() {
         @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
             if (failureCtx.type() != FailureType.SYSTEM_CRITICAL_OPERATION_TIMEOUT)
@@ -59,13 +72,21 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        return super.getConfiguration(igniteInstanceName)
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
             .setFailureHandler(FAILURE_HND)
             .setDataStorageConfiguration(new DataStorageConfiguration()
                 .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                     .setPersistenceEnabled(true))
                 .setCheckpointFrequency(Integer.MAX_VALUE)
                 .setCheckpointReadLockTimeout(1));
+
+        if (testLog != null) {
+            cfg.setGridLogger(testLog);
+
+            testLog = null;
+        }
+
+        return cfg;
     }
 
     /**
@@ -136,4 +157,125 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
 
         stopGrid(0);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true")
+    public void testPrintCpRLockHolder() throws Exception {
+        CountDownLatch canRelease = new CountDownLatch(1);
+
+        testLog = new ListeningTestLogger(false, log);
+
+        LogListener lsnr = LogListener.matches(LOCK_HOLD_MESSAGE).build();
+
+        testLog.registerListener(lsnr);
+
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+        U.ReentrantReadWriteLockTracer tracker = (U.ReentrantReadWriteLockTracer)db.checkpointLock;
+
+        GridTestUtils.runAsync(() -> {
+            db.checkpointLock.readLock().lock();
+
+            try {
+                canRelease.await(tracker.lockWaitThreshold() + 500, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            } finally {
+                db.checkpointLock.readLock().unlock();
+            }
+        }, "async-runnable-runner-1");
+
+        assertTrue(GridTestUtils.waitForCondition(lsnr::check, tracker.lockWaitThreshold() + 1000));
+
+        stopGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true")
+    public void testReentrance() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+        ReentrantReadWriteLock rwLock = db.checkpointLock;
+
+        CountDownLatch waitFirstRLock = new CountDownLatch(1);
+
+        CountDownLatch waitSecondRLock = new CountDownLatch(1);
+
+        long timeout = 500L;
+
+        IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+            //noinspection LockAcquiredButNotSafelyReleased
+            rwLock.readLock().lock();
+
+            //noinspection LockAcquiredButNotSafelyReleased
+            rwLock.readLock().lock();
+
+            rwLock.readLock().unlock();
+
+            waitFirstRLock.countDown();
+
+            try {
+                waitSecondRLock.await();
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            rwLock.readLock().unlock();
+        }, "async-runnable-runner-1");
+
+        IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+            try {
+                waitFirstRLock.await();
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            try {
+                rwLock.writeLock().tryLock();
+
+                assertFalse(GridTestUtils.waitForCondition(rwLock.writeLock()::isHeldByCurrentThread, timeout));
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                e.printStackTrace();
+            }
+
+            waitSecondRLock.countDown();
+
+            try {
+                rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+
+                assertTrue(rwLock.writeLock().isHeldByCurrentThread());
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            finally {
+                rwLock.writeLock().unlock();
+            }
+
+        }, "async-runnable-runner-2");
+
+        f1.get(4 * timeout);
+
+        f0.get(4 * timeout);
+
+        stopGrid(0);
+    }
 }