You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/23 15:38:49 UTC

[iotdb] branch master updated: [IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 43d97732bb [IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)
43d97732bb is described below

commit 43d97732bbb2902ea8956572c071532edc38cb34
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Mar 23 23:38:40 2023 +0800

    [IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)
---
 .../mtree/store/StampedWriterPreferredLock.java    |  21 +++-
 .../mtree/store/disk/cache/CacheMemoryManager.java | 116 ++++++++++-----------
 .../mtree/lock/StampedWriterPreferredLockTest.java |  16 ++-
 3 files changed, 84 insertions(+), 69 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
index 5ec3841062..32c23bb75b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
@@ -68,7 +68,7 @@ public class StampedWriterPreferredLock {
   public long stampedReadLock() {
     lock.lock();
     try {
-      return acquireReadLockStamp();
+      return acquireReadLockStamp(false);
     } finally {
       lock.unlock();
     }
@@ -80,12 +80,24 @@ public class StampedWriterPreferredLock {
    * wait if another thread holds a write lock or the write lock waiting queue is not empty.
    */
   public void threadReadLock() {
+    threadReadLock(false);
+  }
+
+  /**
+   * Acquires the thread-bound read lock. Read lock acquire and release is thread-bound and supports
+   * re-entry within the same thread. Return directly if no thread holds a write lock ; block and
+   * wait if another thread holds a write lock.
+   *
+   * @param prior If false, it will also block and * wait if the write lock waiting queue is not
+   *     empty.
+   */
+  public void threadReadLock(boolean prior) {
     lock.lock();
     try {
       Long allocateStamp = sharedOwnerStamp.get();
       if (allocateStamp == null) {
         // first time entry, acquire read lock and set thread local
-        sharedOwnerStamp.set(acquireReadLockStamp());
+        sharedOwnerStamp.set(acquireReadLockStamp(prior));
       } else {
         // reentry, add read count
         readCnt.put(allocateStamp, readCnt.get(allocateStamp) + 1);
@@ -99,10 +111,11 @@ public class StampedWriterPreferredLock {
    * Acquires a read lock and block and wait if another thread holds a write lock or the write lock
    * waiting queue is not empty.
    *
+   * @param prior if the write lock waiting queue can be ignored
    * @return read lock stamp
    */
-  private long acquireReadLockStamp() {
-    if (writeCnt + writeWait > 0) {
+  private long acquireReadLockStamp(boolean prior) {
+    if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
       readWait++;
       try {
         okToRead.await();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 2085f8febd..51717184aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -35,9 +35,9 @@ import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -49,7 +49,7 @@ public class CacheMemoryManager {
 
   private static final Logger logger = LoggerFactory.getLogger(CacheMemoryManager.class);
 
-  private final List<CachedMTreeStore> storeList = new ArrayList<>();
+  private final List<CachedMTreeStore> storeList = new CopyOnWriteArrayList<>();
 
   private CachedSchemaEngineStatistics engineStatistics;
   private SchemaEngineCachedMetric engineMetric;
@@ -80,11 +80,9 @@ public class CacheMemoryManager {
    * @return LRUCacheManager
    */
   public ICacheManager createLRUCacheManager(CachedMTreeStore store, MemManager memManager) {
-    synchronized (storeList) {
-      ICacheManager cacheManager = new LRUCacheManager(memManager);
-      storeList.add(store);
-      return cacheManager;
-    }
+    ICacheManager cacheManager = new LRUCacheManager(memManager);
+    storeList.add(store);
+    return cacheManager;
   }
 
   public void init(ISchemaEngineStatistics engineStatistics) {
@@ -200,34 +198,32 @@ public class CacheMemoryManager {
    * added or updated, fire flush task.
    */
   private void tryExecuteMemoryRelease() {
-    synchronized (storeList) {
-      long startTime = System.currentTimeMillis();
-      CompletableFuture.allOf(
-              storeList.stream()
-                  .map(
-                      store ->
-                          CompletableFuture.runAsync(
-                              () -> {
-                                store.getLock().threadReadLock();
-                                try {
-                                  executeMemoryRelease(store);
-                                } finally {
-                                  store.getLock().threadReadUnlock();
-                                }
-                              },
-                              releaseTaskProcessor))
-                  .toArray(CompletableFuture[]::new))
-          .join();
-      if (engineMetric != null) {
-        engineMetric.recordRelease(System.currentTimeMillis() - startTime);
-      }
-      synchronized (blockObject) {
-        hasReleaseTask = false;
-        if (isExceedFlushThreshold()) {
-          registerFlushTask();
-        } else {
-          blockObject.notifyAll();
-        }
+    long startTime = System.currentTimeMillis();
+    CompletableFuture.allOf(
+            storeList.stream()
+                .map(
+                    store ->
+                        CompletableFuture.runAsync(
+                            () -> {
+                              store.getLock().threadReadLock();
+                              try {
+                                executeMemoryRelease(store);
+                              } finally {
+                                store.getLock().threadReadUnlock();
+                              }
+                            },
+                            releaseTaskProcessor))
+                .toArray(CompletableFuture[]::new))
+        .join();
+    if (engineMetric != null) {
+      engineMetric.recordRelease(System.currentTimeMillis() - startTime);
+    }
+    synchronized (blockObject) {
+      hasReleaseTask = false;
+      if (isExceedFlushThreshold()) {
+        registerFlushTask();
+      } else {
+        blockObject.notifyAll();
       }
     }
   }
@@ -252,32 +248,30 @@ public class CacheMemoryManager {
 
   /** Sync all volatile nodes to schemaFile and execute memory release after flush. */
   private void tryFlushVolatileNodes() {
-    synchronized (storeList) {
-      long startTime = System.currentTimeMillis();
-      CompletableFuture.allOf(
-              storeList.stream()
-                  .map(
-                      store ->
-                          CompletableFuture.runAsync(
-                              () -> {
-                                store.getLock().writeLock();
-                                try {
-                                  store.flushVolatileNodes();
-                                  executeMemoryRelease(store);
-                                } finally {
-                                  store.getLock().unlockWrite();
-                                }
-                              },
-                              flushTaskProcessor))
-                  .toArray(CompletableFuture[]::new))
-          .join();
-      if (engineMetric != null) {
-        engineMetric.recordFlush(System.currentTimeMillis() - startTime);
-      }
-      synchronized (blockObject) {
-        hasFlushTask = false;
-        blockObject.notifyAll();
-      }
+    long startTime = System.currentTimeMillis();
+    CompletableFuture.allOf(
+            storeList.stream()
+                .map(
+                    store ->
+                        CompletableFuture.runAsync(
+                            () -> {
+                              store.getLock().writeLock();
+                              try {
+                                store.flushVolatileNodes();
+                                executeMemoryRelease(store);
+                              } finally {
+                                store.getLock().unlockWrite();
+                              }
+                            },
+                            flushTaskProcessor))
+                .toArray(CompletableFuture[]::new))
+        .join();
+    if (engineMetric != null) {
+      engineMetric.recordFlush(System.currentTimeMillis() - startTime);
+    }
+    synchronized (blockObject) {
+      hasFlushTask = false;
+      blockObject.notifyAll();
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 39d46e59aa..6e0cbc04b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -190,6 +190,14 @@ public class StampedWriterPreferredLockTest {
               lock.threadReadUnlock();
             })
         .start();
+    new Thread(
+            () -> {
+              // it will not be blocked because of priority
+              lock.threadReadLock(true);
+              counter.incrementAndGet();
+              lock.threadReadUnlock();
+            })
+        .start();
     new Thread(
             () -> {
               // it will be blocked because of writer preferred
@@ -199,14 +207,14 @@ public class StampedWriterPreferredLockTest {
             })
         .start();
     try {
-      Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 3);
+      Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
       Assert.fail();
     } catch (ConditionTimeoutException e) {
-      Assert.assertEquals(0, counter.get());
+      Assert.assertEquals(1, counter.get());
     }
     // release main read lock
     lock.stampedReadUnlock(stamp);
-    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 3);
-    Assert.assertEquals(3, counter.get());
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
+    Assert.assertEquals(4, counter.get());
   }
 }