You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/23 15:38:49 UTC
[iotdb] branch master updated: [IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 43d97732bb [IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)
43d97732bb is described below
commit 43d97732bbb2902ea8956572c071532edc38cb34
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Mar 23 23:38:40 2023 +0800
[IOTDB-5720] Fix release processor fail to release memory due to writer-preferred starvation (#9431)
---
.../mtree/store/StampedWriterPreferredLock.java | 21 +++-
.../mtree/store/disk/cache/CacheMemoryManager.java | 116 ++++++++++-----------
.../mtree/lock/StampedWriterPreferredLockTest.java | 16 ++-
3 files changed, 84 insertions(+), 69 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
index 5ec3841062..32c23bb75b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/StampedWriterPreferredLock.java
@@ -68,7 +68,7 @@ public class StampedWriterPreferredLock {
public long stampedReadLock() {
lock.lock();
try {
- return acquireReadLockStamp();
+ return acquireReadLockStamp(false);
} finally {
lock.unlock();
}
@@ -80,12 +80,24 @@ public class StampedWriterPreferredLock {
* wait if another thread holds a write lock or the write lock waiting queue is not empty.
*/
public void threadReadLock() {
+ threadReadLock(false);
+ }
+
+ /**
+ * Acquires the thread-bound read lock. Read lock acquire and release is thread-bound and supports
+ * re-entry within the same thread. Return directly if no thread holds a write lock ; block and
+ * wait if another thread holds a write lock.
+ *
+ * @param prior If false, it will also block and * wait if the write lock waiting queue is not
+ * empty.
+ */
+ public void threadReadLock(boolean prior) {
lock.lock();
try {
Long allocateStamp = sharedOwnerStamp.get();
if (allocateStamp == null) {
// first time entry, acquire read lock and set thread local
- sharedOwnerStamp.set(acquireReadLockStamp());
+ sharedOwnerStamp.set(acquireReadLockStamp(prior));
} else {
// reentry, add read count
readCnt.put(allocateStamp, readCnt.get(allocateStamp) + 1);
@@ -99,10 +111,11 @@ public class StampedWriterPreferredLock {
* Acquires a read lock and block and wait if another thread holds a write lock or the write lock
* waiting queue is not empty.
*
+ * @param prior if the write lock waiting queue can be ignored
* @return read lock stamp
*/
- private long acquireReadLockStamp() {
- if (writeCnt + writeWait > 0) {
+ private long acquireReadLockStamp(boolean prior) {
+ if ((prior ? writeCnt : writeCnt + writeWait) > 0) {
readWait++;
try {
okToRead.await();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 2085f8febd..51717184aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -35,9 +35,9 @@ import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
/**
@@ -49,7 +49,7 @@ public class CacheMemoryManager {
private static final Logger logger = LoggerFactory.getLogger(CacheMemoryManager.class);
- private final List<CachedMTreeStore> storeList = new ArrayList<>();
+ private final List<CachedMTreeStore> storeList = new CopyOnWriteArrayList<>();
private CachedSchemaEngineStatistics engineStatistics;
private SchemaEngineCachedMetric engineMetric;
@@ -80,11 +80,9 @@ public class CacheMemoryManager {
* @return LRUCacheManager
*/
public ICacheManager createLRUCacheManager(CachedMTreeStore store, MemManager memManager) {
- synchronized (storeList) {
- ICacheManager cacheManager = new LRUCacheManager(memManager);
- storeList.add(store);
- return cacheManager;
- }
+ ICacheManager cacheManager = new LRUCacheManager(memManager);
+ storeList.add(store);
+ return cacheManager;
}
public void init(ISchemaEngineStatistics engineStatistics) {
@@ -200,34 +198,32 @@ public class CacheMemoryManager {
* added or updated, fire flush task.
*/
private void tryExecuteMemoryRelease() {
- synchronized (storeList) {
- long startTime = System.currentTimeMillis();
- CompletableFuture.allOf(
- storeList.stream()
- .map(
- store ->
- CompletableFuture.runAsync(
- () -> {
- store.getLock().threadReadLock();
- try {
- executeMemoryRelease(store);
- } finally {
- store.getLock().threadReadUnlock();
- }
- },
- releaseTaskProcessor))
- .toArray(CompletableFuture[]::new))
- .join();
- if (engineMetric != null) {
- engineMetric.recordRelease(System.currentTimeMillis() - startTime);
- }
- synchronized (blockObject) {
- hasReleaseTask = false;
- if (isExceedFlushThreshold()) {
- registerFlushTask();
- } else {
- blockObject.notifyAll();
- }
+ long startTime = System.currentTimeMillis();
+ CompletableFuture.allOf(
+ storeList.stream()
+ .map(
+ store ->
+ CompletableFuture.runAsync(
+ () -> {
+ store.getLock().threadReadLock();
+ try {
+ executeMemoryRelease(store);
+ } finally {
+ store.getLock().threadReadUnlock();
+ }
+ },
+ releaseTaskProcessor))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ if (engineMetric != null) {
+ engineMetric.recordRelease(System.currentTimeMillis() - startTime);
+ }
+ synchronized (blockObject) {
+ hasReleaseTask = false;
+ if (isExceedFlushThreshold()) {
+ registerFlushTask();
+ } else {
+ blockObject.notifyAll();
}
}
}
@@ -252,32 +248,30 @@ public class CacheMemoryManager {
/** Sync all volatile nodes to schemaFile and execute memory release after flush. */
private void tryFlushVolatileNodes() {
- synchronized (storeList) {
- long startTime = System.currentTimeMillis();
- CompletableFuture.allOf(
- storeList.stream()
- .map(
- store ->
- CompletableFuture.runAsync(
- () -> {
- store.getLock().writeLock();
- try {
- store.flushVolatileNodes();
- executeMemoryRelease(store);
- } finally {
- store.getLock().unlockWrite();
- }
- },
- flushTaskProcessor))
- .toArray(CompletableFuture[]::new))
- .join();
- if (engineMetric != null) {
- engineMetric.recordFlush(System.currentTimeMillis() - startTime);
- }
- synchronized (blockObject) {
- hasFlushTask = false;
- blockObject.notifyAll();
- }
+ long startTime = System.currentTimeMillis();
+ CompletableFuture.allOf(
+ storeList.stream()
+ .map(
+ store ->
+ CompletableFuture.runAsync(
+ () -> {
+ store.getLock().writeLock();
+ try {
+ store.flushVolatileNodes();
+ executeMemoryRelease(store);
+ } finally {
+ store.getLock().unlockWrite();
+ }
+ },
+ flushTaskProcessor))
+ .toArray(CompletableFuture[]::new))
+ .join();
+ if (engineMetric != null) {
+ engineMetric.recordFlush(System.currentTimeMillis() - startTime);
+ }
+ synchronized (blockObject) {
+ hasFlushTask = false;
+ blockObject.notifyAll();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
index 39d46e59aa..6e0cbc04b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/lock/StampedWriterPreferredLockTest.java
@@ -190,6 +190,14 @@ public class StampedWriterPreferredLockTest {
lock.threadReadUnlock();
})
.start();
+ new Thread(
+ () -> {
+ // it will not be blocked because of priority
+ lock.threadReadLock(true);
+ counter.incrementAndGet();
+ lock.threadReadUnlock();
+ })
+ .start();
new Thread(
() -> {
// it will be blocked because of writer preferred
@@ -199,14 +207,14 @@ public class StampedWriterPreferredLockTest {
})
.start();
try {
- Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 3);
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
Assert.fail();
} catch (ConditionTimeoutException e) {
- Assert.assertEquals(0, counter.get());
+ Assert.assertEquals(1, counter.get());
}
// release main read lock
lock.stampedReadUnlock(stamp);
- Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 3);
- Assert.assertEquals(3, counter.get());
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> counter.get() == 4);
+ Assert.assertEquals(4, counter.get());
}
}