You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/12/17 13:09:46 UTC
[ignite] branch master updated: IGNITE-12434 Dump checkpoint
readLock holder threads if writeLock can`t take lock more than threshold
timeout. - Fixes #7124.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a09d128 IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124.
a09d128 is described below
commit a09d1284a8f5cbaae7a8548af6c32ca422810035
Author: zstan <st...@gmail.com>
AuthorDate: Tue Dec 17 16:09:21 2019 +0300
IGNITE-12434 Dump checkpoint readLock holder threads if writeLock can`t take lock more than threshold timeout. - Fixes #7124.
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../GridCacheDatabaseSharedManager.java | 11 +-
.../apache/ignite/internal/util/IgniteUtils.java | 188 +++++++++++----------
.../persistence/CheckpointReadLockFailureTest.java | 144 +++++++++++++++-
3 files changed, 250 insertions(+), 93 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 2eeb330..a89919b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -226,6 +226,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP";
+ /** Log read lock holders. */
+ public static final String IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS = "IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS";
+
/** MemoryPolicyConfiguration name reserved for meta store. */
public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
@@ -242,6 +245,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** */
private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
+ /** */
+ private final boolean logReadLockHolders = getBoolean(IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS);
+
/**
* Starting from this number of dirty pages in checkpoint, array will be sorted with
* {@link Arrays#parallelSort(Comparable[])} in case of {@link CheckpointWriteOrder#SEQUENTIAL}.
@@ -297,7 +303,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** For testing only. */
private volatile GridFutureAdapter<Void> enableChangeApplied;
- /** */
+ /** Checkpont lock. */
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
@@ -544,6 +550,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final GridKernalContext kernalCtx = cctx.kernalContext();
+ if (logReadLockHolders)
+ checkpointLock = new U.ReentrantReadWriteLockTracer(checkpointLock, kernalCtx, 5_000);
+
if (!kernalCtx.clientNode()) {
kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 34ef7be..1b51d45 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -136,9 +136,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Consumer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.jar.JarFile;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
@@ -234,6 +234,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -339,6 +340,9 @@ public abstract class IgniteUtils {
/** Default user version. */
public static final String DFLT_USER_VERSION = "0";
+ /** Lock hold message. */
+ public static final String LOCK_HOLD_MESSAGE = "ReadLock held the lock more than ";
+
/** Cache for {@link GridPeerDeployAware} fields to speed up reflection. */
private static final ConcurrentMap<String, IgniteBiTuple<Class<?>, Collection<Field>>> p2pFields =
new ConcurrentHashMap<>();
@@ -10824,20 +10828,6 @@ public abstract class IgniteUtils {
}
/**
- * @param lock Lock.
- */
- public static ReentrantReadWriteLockTracer lockTracer(ReadWriteLock lock) {
- return new ReentrantReadWriteLockTracer(lock);
- }
-
- /**
- * @param lock Lock.
- */
- public static LockTracer lockTracer(Lock lock) {
- return new LockTracer(lock);
- }
-
- /**
* Puts additional text to thread name.
* Calls {@code enhanceThreadName(Thread.currentThread(), text)}.
* For details see {@link #enhanceThreadName(Thread, String)}.
@@ -11388,102 +11378,125 @@ public abstract class IgniteUtils {
};
}
- /**
- *
- */
- public static class ReentrantReadWriteLockTracer implements ReadWriteLock {
+ /** */
+ public static class ReentrantReadWriteLockTracer extends ReentrantReadWriteLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Read lock. */
- private final LockTracer readLock;
+ private final ReadLockTracer readLock;
/** Write lock. */
- private final LockTracer writeLock;
+ private final WriteLockTracer writeLock;
+
+ /** Lock print threshold. */
+ private long readLockThreshold;
+
+ /** */
+ private IgniteLogger log;
/**
- * @param delegate Delegate.
+ * @param delegate RWLock delegate.
+ * @param kctx Kernal context.
+ * @param readLockThreshold ReadLock threshold timeout.
+ *
*/
- public ReentrantReadWriteLockTracer(ReadWriteLock delegate) {
- readLock = new LockTracer(delegate.readLock());
- writeLock = new LockTracer(delegate.writeLock());
+ public ReentrantReadWriteLockTracer(ReentrantReadWriteLock delegate, GridKernalContext kctx, long readLockThreshold) {
+ log = kctx.cache().context().logger(getClass());
+
+ readLock = new ReadLockTracer(delegate, log, readLockThreshold);
+
+ writeLock = new WriteLockTracer(delegate);
+
+ this.readLockThreshold = readLockThreshold;
}
/** {@inheritDoc} */
- @NotNull @Override public Lock readLock() {
+ @Override public ReadLock readLock() {
return readLock;
}
/** {@inheritDoc} */
- @NotNull @Override public Lock writeLock() {
+ @Override public WriteLock writeLock() {
return writeLock;
}
- /**
- *
- */
- public LockTracer getReadLock() {
- return readLock;
- }
-
- /**
- *
- */
- public LockTracer getWriteLock() {
- return writeLock;
+ /** */
+ public long lockWaitThreshold() {
+ return readLockThreshold;
}
}
- /**
- *
- */
- public static class LockTracer implements Lock {
+ /** */
+ private static class ReadLockTracer extends ReentrantReadWriteLock.ReadLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Delegate. */
- private final Lock delegate;
+ private final ReentrantReadWriteLock.ReadLock delegate;
- private final AtomicLong cnt = new AtomicLong();
+ /** */
+ private static final ThreadLocal<T2<Integer, Long>> READ_LOCK_HOLDER_TS =
+ ThreadLocal.withInitial(() -> new T2<>(0, 0L));
- /** Count. */
- private final ConcurrentMap<String, AtomicLong> cntMap = new ConcurrentHashMap<>();
+ /** */
+ private IgniteLogger log;
- /**
- * @param delegate Delegate.
- */
- public LockTracer(Lock delegate) {
- this.delegate = delegate;
- }
+ /** */
+ private long readLockThreshold;
- /**
- *
- */
- private void inc(){
- cnt.incrementAndGet();
+ /** */
+ public ReadLockTracer(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) {
+ super(lock);
- String name = Thread.currentThread().getName();
+ delegate = lock.readLock();
- AtomicLong cnt = cntMap.get(name);
+ this.log = log;
- if (cnt == null) {
- AtomicLong cnt0 = cntMap.putIfAbsent(name, cnt = new AtomicLong());
+ this.readLockThreshold = readLockThreshold;
+ }
- if (cnt0 != null)
- cnt = cnt0;
- }
+ /** */
+ private void inc() {
+ T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get();
+
+ int cntr = val.get1();
+
+ if (cntr == 0)
+ val.set2(U.currentTimeMillis());
- cnt.incrementAndGet();
+ val.set1(++cntr);
+
+ READ_LOCK_HOLDER_TS.set(val);
}
- /**
- *
- */
- private void dec(){
- cnt.decrementAndGet();
+ /** */
+ private void dec() {
+ T2<Integer, Long> val = READ_LOCK_HOLDER_TS.get();
+
+ int cntr = val.get1();
+
+ if (--cntr == 0) {
+ long timeout = U.currentTimeMillis() - val.get2();
- String name = Thread.currentThread().getName();
+ if (timeout > readLockThreshold) {
+ GridStringBuilder sb = new GridStringBuilder();
- AtomicLong cnt = cntMap.get(name);
+ sb.a(LOCK_HOLD_MESSAGE + timeout + " ms." + nl());
- cnt.decrementAndGet();
+ U.printStackTrace(Thread.currentThread().getId(), sb);
+
+ U.warn(log, sb.toString());
+ }
+ }
+
+ val.set1(cntr);
+
+ READ_LOCK_HOLDER_TS.set(val);
}
/** {@inheritDoc} */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void lock() {
delegate.lock();
@@ -11491,6 +11504,7 @@ public abstract class IgniteUtils {
}
/** {@inheritDoc} */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void lockInterruptibly() throws InterruptedException {
delegate.lockInterruptibly();
@@ -11525,24 +11539,16 @@ public abstract class IgniteUtils {
dec();
}
+ }
- /** {@inheritDoc} */
- @NotNull @Override public Condition newCondition() {
- return delegate.newCondition();
- }
-
- /**
- *
- */
- public Map<String, AtomicLong> getLockUnlockCounters() {
- return new HashMap<>(cntMap);
- }
-
- /**
- *
- */
- public long getLockUnlockCounter() {
- return cnt.get();
+ /** */
+ private static class WriteLockTracer extends ReentrantReadWriteLock.WriteLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public WriteLockTracer(ReentrantReadWriteLock lock) {
+ super(lock);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
index 5f2a910..b15f6a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointReadLockFailureTest.java
@@ -21,6 +21,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -30,18 +32,29 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS;
+import static org.apache.ignite.internal.util.IgniteUtils.LOCK_HOLD_MESSAGE;
+
/**
* Tests critical failure handling on checkpoint read lock acquisition errors.
*/
public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
/** */
+ private ListeningTestLogger testLog;
+
+ /** */
private static final AbstractFailureHandler FAILURE_HND = new AbstractFailureHandler() {
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
if (failureCtx.type() != FailureType.SYSTEM_CRITICAL_OPERATION_TIMEOUT)
@@ -59,13 +72,21 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- return super.getConfiguration(igniteInstanceName)
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
.setFailureHandler(FAILURE_HND)
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true))
.setCheckpointFrequency(Integer.MAX_VALUE)
.setCheckpointReadLockTimeout(1));
+
+ if (testLog != null) {
+ cfg.setGridLogger(testLog);
+
+ testLog = null;
+ }
+
+ return cfg;
}
/**
@@ -136,4 +157,125 @@ public class CheckpointReadLockFailureTest extends GridCommonAbstractTest {
stopGrid(0);
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true")
+ public void testPrintCpRLockHolder() throws Exception {
+ CountDownLatch canRelease = new CountDownLatch(1);
+
+ testLog = new ListeningTestLogger(false, log);
+
+ LogListener lsnr = LogListener.matches(LOCK_HOLD_MESSAGE).build();
+
+ testLog.registerListener(lsnr);
+
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().active(true);
+
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+ U.ReentrantReadWriteLockTracer tracker = (U.ReentrantReadWriteLockTracer)db.checkpointLock;
+
+ GridTestUtils.runAsync(() -> {
+ db.checkpointLock.readLock().lock();
+
+ try {
+ canRelease.await(tracker.lockWaitThreshold() + 500, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ db.checkpointLock.readLock().unlock();
+ }
+ }, "async-runnable-runner-1");
+
+ assertTrue(GridTestUtils.waitForCondition(lsnr::check, tracker.lockWaitThreshold() + 1000));
+
+ stopGrid(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS, value = "true")
+ public void testReentrance() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().active(true);
+
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+ ReentrantReadWriteLock rwLock = db.checkpointLock;
+
+ CountDownLatch waitFirstRLock = new CountDownLatch(1);
+
+ CountDownLatch waitSecondRLock = new CountDownLatch(1);
+
+ long timeout = 500L;
+
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ //noinspection LockAcquiredButNotSafelyReleased
+ rwLock.readLock().lock();
+
+ //noinspection LockAcquiredButNotSafelyReleased
+ rwLock.readLock().lock();
+
+ rwLock.readLock().unlock();
+
+ waitFirstRLock.countDown();
+
+ try {
+ waitSecondRLock.await();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ rwLock.readLock().unlock();
+ }, "async-runnable-runner-1");
+
+ IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+ try {
+ waitFirstRLock.await();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ rwLock.writeLock().tryLock();
+
+ assertFalse(GridTestUtils.waitForCondition(rwLock.writeLock()::isHeldByCurrentThread, timeout));
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
+ waitSecondRLock.countDown();
+
+ try {
+ rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+
+ assertTrue(rwLock.writeLock().isHeldByCurrentThread());
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+
+ }, "async-runnable-runner-2");
+
+ f1.get(4 * timeout);
+
+ f0.get(4 * timeout);
+
+ stopGrid(0);
+ }
}