You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/24 03:24:19 UTC

[iotdb] branch ty/DeadLockMem created (now 10cab0272c)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty/DeadLockMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 10cab0272c [IOTDB-5033] Fix deadlock in memory pool

This branch includes the following new commits:

     new 10cab0272c [IOTDB-5033] Fix deadlock in memory pool

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5033] Fix deadlock in memory pool

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/DeadLockMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 10cab0272cbada2edc957502e9c2bca0c54e0815
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Nov 24 11:24:05 2022 +0800

    [IOTDB-5033] Fix deadlock in memory pool
---
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 84 ++++++++++++++--------
 1 file changed, 55 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 11de85f863..8494fa4a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -25,18 +25,24 @@ import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
 /** A thread-safe memory pool. */
 public class MemoryPool {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPool.class);
+
   public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
     private final String queryId;
     private final long bytes;
@@ -180,41 +186,61 @@ public class MemoryPool {
     return ((MemoryReservationFuture<Void>) future).getBytes();
   }
 
-  public synchronized void free(String queryId, long bytes) {
-    Validate.notNull(queryId);
-    Validate.isTrue(bytes > 0L);
-
-    Long queryReservedBytes = queryMemoryReservations.get(queryId);
-    Validate.notNull(queryReservedBytes);
-    Validate.isTrue(bytes <= queryReservedBytes);
+  public void free(String queryId, long bytes) {
+    List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
+    synchronized (this) {
+      Validate.notNull(queryId);
+      Validate.isTrue(bytes > 0L);
 
-    queryReservedBytes -= bytes;
-    if (queryReservedBytes == 0) {
-      queryMemoryReservations.remove(queryId);
-    } else {
-      queryMemoryReservations.put(queryId, queryReservedBytes);
-    }
-    reservedBytes -= bytes;
+      Long queryReservedBytes = queryMemoryReservations.get(queryId);
+      Validate.notNull(queryReservedBytes);
+      Validate.isTrue(bytes <= queryReservedBytes);
 
-    if (memoryReservationFutures.isEmpty()) {
-      return;
-    }
-    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
-    while (iterator.hasNext()) {
-      MemoryReservationFuture<Void> future = iterator.next();
-      if (future.isCancelled() || future.isDone()) {
-        continue;
+      queryReservedBytes -= bytes;
+      if (queryReservedBytes == 0) {
+        queryMemoryReservations.remove(queryId);
+      } else {
+        queryMemoryReservations.put(queryId, queryReservedBytes);
       }
-      long bytesToReserve = future.getBytes();
-      if (maxBytes - reservedBytes < bytesToReserve) {
+      reservedBytes -= bytes;
+
+      if (memoryReservationFutures.isEmpty()) {
         return;
       }
-      if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
-          >= bytesToReserve) {
-        reservedBytes += bytesToReserve;
-        queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+      Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+      while (iterator.hasNext()) {
+        MemoryReservationFuture<Void> future = iterator.next();
+        if (future.isCancelled() || future.isDone()) {
+          continue;
+        }
+        long bytesToReserve = future.getBytes();
+        if (maxBytes - reservedBytes < bytesToReserve) {
+          return;
+        }
+        if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
+            >= bytesToReserve) {
+          reservedBytes += bytesToReserve;
+          queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+          futureList.add(future);
+          iterator.remove();
+        }
+      }
+    }
+
+    // why we need to put this outside MemoryPool's lock?
+    // If we put this block inside the MemoryPool's lock, we will get deadlock case like the
+    // following:
+    // Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
+    // MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
+    // B-SharedTsBlockQueue's lock
+    // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
+    // B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
+    for (MemoryReservationFuture<Void> future : futureList) {
+      try {
         future.set(null);
-        iterator.remove();
+      } catch (Throwable t) {
+        // ignore it, because we still need to notify other future
+        LOGGER.error("error happened while trying to free memory: ", t);
       }
     }
   }