You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/22 06:14:06 UTC

[iotdb] 01/01: Fix concurrent bug caused by identitySink

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ccfb7f0fce6300533361aeb38cd6dea15ddff6c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Mar 22 14:13:03 2023 +0800

    Fix concurrent bug caused by identitySink
---
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 7d4cda4b34..37dfc13d9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -260,18 +260,22 @@ public class MemoryPool {
     Validate.notNull(queryId);
     Validate.isTrue(bytes > 0L);
 
-    Long queryReservedBytes =
-        queryMemoryReservations
-            .getOrDefault(queryId, Collections.emptyMap())
-            .getOrDefault(fragmentInstanceId, Collections.emptyMap())
-            .computeIfPresent(
-                planNodeId,
-                (k, reservedMemory) -> {
-                  if (reservedMemory < bytes) {
-                    throw new IllegalArgumentException("Free more memory than has been reserved.");
-                  }
-                  return reservedMemory - bytes;
-                });
+    try {
+      queryMemoryReservations
+          .get(queryId)
+          .get(fragmentInstanceId)
+          .computeIfPresent(
+              planNodeId,
+              (k, reservedMemory) -> {
+                if (reservedMemory < bytes) {
+                  throw new IllegalArgumentException("Free more memory than has been reserved.");
+                }
+                return reservedMemory - bytes;
+              });
+    } catch (NullPointerException e) {
+      throw new IllegalArgumentException("RelatedMemoryReserved can't be null when freeing memory");
+    }
+
     remainingBytes.addAndGet(bytes);
 
     List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
@@ -345,9 +349,16 @@ public class MemoryPool {
       return;
     }
 
-    if (planNodeIdToBytesReserved.get(planNodeId) == null
-        || planNodeIdToBytesReserved.get(planNodeId) == 0) {
-      planNodeIdToBytesReserved.remove(planNodeId);
+    Long newValue =
+        planNodeIdToBytesReserved.computeIfPresent(
+            planNodeId,
+            (k, memoryReserved) -> {
+              if (memoryReserved == 0) {
+                return null;
+              }
+              return memoryReserved;
+            });
+    if (newValue == null) {
       instanceBytesReserved.computeIfPresent(
           fragmentInstanceId,
           (k, kPlanNodeBytesReserved) -> {