You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/08/30 17:11:12 UTC

[ignite] branch master updated: IGNITE-12125 Concurrency problem in PagesWriteThrottle - Fixes #6826.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fcec4e1  IGNITE-12125 Concurrency problem in PagesWriteThrottle - Fixes #6826.
fcec4e1 is described below

commit fcec4e125b514bc2f2e8f7c2165bc3c5b12a2352
Author: Sergey Antonov <an...@gmail.com>
AuthorDate: Fri Aug 30 20:10:54 2019 +0300

    IGNITE-12125 Concurrency problem in PagesWriteThrottle - Fixes #6826.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
---
 .../persistence/pagemem/PagesWriteThrottle.java    | 39 ++++++++++++++--------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index 1586599..b2a0ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -16,8 +16,7 @@
 */
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
@@ -60,8 +59,8 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
     /** Logger. */
     private IgniteLogger log;
 
-    /** Currently parking threads. */
-    private final Collection<Thread> parkThrds = new ConcurrentLinkedQueue<>();
+    /** Threads that are throttled due to checkpoint buffer overflow. */
+    private final ConcurrentHashMap<Long, Thread> cpBufThrottledThreads = new ConcurrentHashMap<>();
 
     /**
      * @param pageMemory Page memory.
@@ -126,23 +125,36 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
 
             long throttleParkTimeNs = (long) (STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, throttleLevel));
 
+            Thread curThread = Thread.currentThread();
+
             if (throttleParkTimeNs > LOGGING_THRESHOLD) {
-                U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+                U.warn(log, "Parking thread=" + curThread.getName()
                     + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
             }
 
-            if (isPageInCheckpoint)
-                parkThrds.add(Thread.currentThread());
+            if (isPageInCheckpoint) {
+                cpBufThrottledThreads.put(curThread.getId(), curThread);
+
+                try {
+                    LockSupport.parkNanos(throttleParkTimeNs);
+                }
+                finally {
+                    cpBufThrottledThreads.remove(curThread.getId());
 
-            LockSupport.parkNanos(throttleParkTimeNs);
+                    if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                        U.warn(log, "Unparking thread=" + curThread.getName()
+                            + " with park timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+                    }
+                }
+            }
+            else
+                LockSupport.parkNanos(throttleParkTimeNs);
         }
         else {
             int oldCntr = cntr.getAndSet(0);
 
-            if (isPageInCheckpoint && oldCntr != 0) {
-                parkThrds.forEach(LockSupport::unpark);
-                parkThrds.clear();
-            }
+            if (isPageInCheckpoint && oldCntr != 0)
+                cpBufThrottledThreads.values().forEach(LockSupport::unpark);
         }
     }
 
@@ -151,8 +163,7 @@ public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
         if (!shouldThrottle()) {
             inCheckpointBackoffCntr.set(0);
 
-            parkThrds.forEach(LockSupport::unpark);
-            parkThrds.clear();
+            cpBufThrottledThreads.values().forEach(LockSupport::unpark);
         }
     }