You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/24 07:39:57 UTC

[iotdb] branch master updated: [IOTDB-5033] Fix deadlock in memory pool (#8123)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d9227e79fb [IOTDB-5033] Fix deadlock in memory pool (#8123)
d9227e79fb is described below

commit d9227e79fb7d8957bb92af394cfbcd2093eebf20
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Nov 24 15:39:52 2022 +0800

    [IOTDB-5033] Fix deadlock in memory pool (#8123)
---
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 84 ++++++++++++++--------
 1 file changed, 55 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 11de85f863..8494fa4a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -25,18 +25,24 @@ import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
 /** A thread-safe memory pool. */
 public class MemoryPool {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPool.class);
+
   public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
     private final String queryId;
     private final long bytes;
@@ -180,41 +186,61 @@ public class MemoryPool {
     return ((MemoryReservationFuture<Void>) future).getBytes();
   }
 
-  public synchronized void free(String queryId, long bytes) {
-    Validate.notNull(queryId);
-    Validate.isTrue(bytes > 0L);
-
-    Long queryReservedBytes = queryMemoryReservations.get(queryId);
-    Validate.notNull(queryReservedBytes);
-    Validate.isTrue(bytes <= queryReservedBytes);
+  public void free(String queryId, long bytes) {
+    List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
+    synchronized (this) {
+      Validate.notNull(queryId);
+      Validate.isTrue(bytes > 0L);
 
-    queryReservedBytes -= bytes;
-    if (queryReservedBytes == 0) {
-      queryMemoryReservations.remove(queryId);
-    } else {
-      queryMemoryReservations.put(queryId, queryReservedBytes);
-    }
-    reservedBytes -= bytes;
+      Long queryReservedBytes = queryMemoryReservations.get(queryId);
+      Validate.notNull(queryReservedBytes);
+      Validate.isTrue(bytes <= queryReservedBytes);
 
-    if (memoryReservationFutures.isEmpty()) {
-      return;
-    }
-    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
-    while (iterator.hasNext()) {
-      MemoryReservationFuture<Void> future = iterator.next();
-      if (future.isCancelled() || future.isDone()) {
-        continue;
+      queryReservedBytes -= bytes;
+      if (queryReservedBytes == 0) {
+        queryMemoryReservations.remove(queryId);
+      } else {
+        queryMemoryReservations.put(queryId, queryReservedBytes);
       }
-      long bytesToReserve = future.getBytes();
-      if (maxBytes - reservedBytes < bytesToReserve) {
+      reservedBytes -= bytes;
+
+      if (memoryReservationFutures.isEmpty()) {
         return;
       }
-      if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
-          >= bytesToReserve) {
-        reservedBytes += bytesToReserve;
-        queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+      Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+      while (iterator.hasNext()) {
+        MemoryReservationFuture<Void> future = iterator.next();
+        if (future.isCancelled() || future.isDone()) {
+          continue;
+        }
+        long bytesToReserve = future.getBytes();
+        if (maxBytes - reservedBytes < bytesToReserve) {
+          return;
+        }
+        if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
+            >= bytesToReserve) {
+          reservedBytes += bytesToReserve;
+          queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+          futureList.add(future);
+          iterator.remove();
+        }
+      }
+    }
+
+    // why we need to put this outside MemoryPool's lock?
+    // If we put this block inside the MemoryPool's lock, we will get deadlock case like the
+    // following:
+    // Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
+    // MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
+    // B-SharedTsBlockQueue's lock
+    // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
+    // B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
+    for (MemoryReservationFuture<Void> future : futureList) {
+      try {
         future.set(null);
-        iterator.remove();
+      } catch (Throwable t) {
+        // ignore it, because we still need to notify other future
+        LOGGER.error("error happened while trying to free memory: ", t);
       }
     }
   }