You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/24 07:39:57 UTC
[iotdb] branch master updated: [IOTDB-5033] Fix deadlock in memory pool (#8123)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d9227e79fb [IOTDB-5033] Fix deadlock in memory pool (#8123)
d9227e79fb is described below
commit d9227e79fb7d8957bb92af394cfbcd2093eebf20
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Nov 24 15:39:52 2022 +0800
[IOTDB-5033] Fix deadlock in memory pool (#8123)
---
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 84 ++++++++++++++--------
1 file changed, 55 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 11de85f863..8494fa4a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -25,18 +25,24 @@ import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
/** A thread-safe memory pool. */
public class MemoryPool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPool.class);
+
public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
private final String queryId;
private final long bytes;
@@ -180,41 +186,61 @@ public class MemoryPool {
return ((MemoryReservationFuture<Void>) future).getBytes();
}
- public synchronized void free(String queryId, long bytes) {
- Validate.notNull(queryId);
- Validate.isTrue(bytes > 0L);
-
- Long queryReservedBytes = queryMemoryReservations.get(queryId);
- Validate.notNull(queryReservedBytes);
- Validate.isTrue(bytes <= queryReservedBytes);
+ public void free(String queryId, long bytes) {
+ List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
+ synchronized (this) {
+ Validate.notNull(queryId);
+ Validate.isTrue(bytes > 0L);
- queryReservedBytes -= bytes;
- if (queryReservedBytes == 0) {
- queryMemoryReservations.remove(queryId);
- } else {
- queryMemoryReservations.put(queryId, queryReservedBytes);
- }
- reservedBytes -= bytes;
+ Long queryReservedBytes = queryMemoryReservations.get(queryId);
+ Validate.notNull(queryReservedBytes);
+ Validate.isTrue(bytes <= queryReservedBytes);
- if (memoryReservationFutures.isEmpty()) {
- return;
- }
- Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
- while (iterator.hasNext()) {
- MemoryReservationFuture<Void> future = iterator.next();
- if (future.isCancelled() || future.isDone()) {
- continue;
+ queryReservedBytes -= bytes;
+ if (queryReservedBytes == 0) {
+ queryMemoryReservations.remove(queryId);
+ } else {
+ queryMemoryReservations.put(queryId, queryReservedBytes);
}
- long bytesToReserve = future.getBytes();
- if (maxBytes - reservedBytes < bytesToReserve) {
+ reservedBytes -= bytes;
+
+ if (memoryReservationFutures.isEmpty()) {
return;
}
- if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
- >= bytesToReserve) {
- reservedBytes += bytesToReserve;
- queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+ Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
+ while (iterator.hasNext()) {
+ MemoryReservationFuture<Void> future = iterator.next();
+ if (future.isCancelled() || future.isDone()) {
+ continue;
+ }
+ long bytesToReserve = future.getBytes();
+ if (maxBytes - reservedBytes < bytesToReserve) {
+ return;
+ }
+ if (maxBytesPerQuery - queryMemoryReservations.getOrDefault(future.getQueryId(), 0L)
+ >= bytesToReserve) {
+ reservedBytes += bytesToReserve;
+ queryMemoryReservations.merge(future.getQueryId(), bytesToReserve, Long::sum);
+ futureList.add(future);
+ iterator.remove();
+ }
+ }
+ }
+
+ // why we need to put this outside MemoryPool's lock?
+ // If we put this block inside the MemoryPool's lock, we will get deadlock case like the
+ // following:
+ // Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
+ // MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
+ // B-SharedTsBlockQueue's lock
+ // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
+ // B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
+ for (MemoryReservationFuture<Void> future : futureList) {
+ try {
future.set(null);
- iterator.remove();
+ } catch (Throwable t) {
+ // ignore it, because we still need to notify other future
+ LOGGER.error("error happened while trying to free memory: ", t);
}
}
}