You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/08/30 17:11:12 UTC
[ignite] branch master updated: IGNITE-12125 Concurrency problem in
PagesWriteThrottle - Fixes #6826.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fcec4e1 IGNITE-12125 Concurrency problem in PagesWriteThrottle - Fixes #6826.
fcec4e1 is described below
commit fcec4e125b514bc2f2e8f7c2165bc3c5b12a2352
Author: Sergey Antonov <an...@gmail.com>
AuthorDate: Fri Aug 30 20:10:54 2019 +0300
IGNITE-12125 Concurrency problem in PagesWriteThrottle - Fixes #6826.
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../persistence/pagemem/PagesWriteThrottle.java | 39 ++++++++++++++--------
1 file changed, 25 insertions(+), 14 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index 1586599..b2a0ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -16,8 +16,7 @@
*/
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteLogger;
@@ -60,8 +59,8 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
/** Logger. */
private IgniteLogger log;
- /** Currently parking threads. */
- private final Collection<Thread> parkThrds = new ConcurrentLinkedQueue<>();
+ /** Threads that are throttled due to checkpoint buffer overflow. */
+ private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>();
/**
* @param pageMemory Page memory.
@@ -126,23 +125,36 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
long throttleParkTimeNs = (long) (STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel));
+ Thread curThread = Thread.currentThread();
+
if (throttleParkTimeNs > LOGGING_THRESHOLD) {
- U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+ U.warn(log, "Parking thread=" + curThread.getName()
+ " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
}
- if (isPageInCheckpoint)
- parkThrds.add(Thread.currentThread());
+ if (isPageInCheckpoint) {
+ cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+ try {
+ LockSupport.parkNanos(throttleParkTimeNs);
+ }
+ finally {
+ cpBufThrottledThreads.remove(curThread.getId());
- LockSupport.parkNanos(throttleParkTimeNs);
+ if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+ U.warn(log, "Unparking thread=" + curThread.getName()
+ + " with park timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+ }
+ }
+ }
+ else
+ LockSupport.parkNanos(throttleParkTimeNs);
}
else {
int oldCntr = cntr.getAndSet(0);
- if (isPageInCheckpoint && oldCntr != 0) {
- parkThrds.forEach(LockSupport::unpark);
- parkThrds.clear();
- }
+ if (isPageInCheckpoint && oldCntr != 0)
+ cpBufThrottledThreads.values().forEach(LockSupport::unpark);
}
}
@@ -151,8 +163,7 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
if (!shouldThrottle()) {
inCheckpointBackoffCntr.set(0);
- parkThrds.forEach(LockSupport::unpark);
- parkThrds.clear();
+ cpBufThrottledThreads.values().forEach(LockSupport::unpark);
}
}