You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/22 06:14:06 UTC
[iotdb] 01/01: Fix concurrent bug caused by identitySink
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8ccfb7f0fce6300533361aeb38cd6dea15ddff6c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Mar 22 14:13:03 2023 +0800
Fix concurrent bug caused by identitySink
---
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 41 ++++++++++++++--------
1 file changed, 26 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 7d4cda4b34..37dfc13d9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -260,18 +260,22 @@ public class MemoryPool {
Validate.notNull(queryId);
Validate.isTrue(bytes > 0L);
- Long queryReservedBytes =
- queryMemoryReservations
- .getOrDefault(queryId, Collections.emptyMap())
- .getOrDefault(fragmentInstanceId, Collections.emptyMap())
- .computeIfPresent(
- planNodeId,
- (k, reservedMemory) -> {
- if (reservedMemory < bytes) {
- throw new IllegalArgumentException("Free more memory than has been reserved.");
- }
- return reservedMemory - bytes;
- });
+ try {
+ queryMemoryReservations
+ .get(queryId)
+ .get(fragmentInstanceId)
+ .computeIfPresent(
+ planNodeId,
+ (k, reservedMemory) -> {
+ if (reservedMemory < bytes) {
+ throw new IllegalArgumentException("Free more memory than has been reserved.");
+ }
+ return reservedMemory - bytes;
+ });
+ } catch (NullPointerException e) {
+ throw new IllegalArgumentException("RelatedMemoryReserved can't be null when freeing memory");
+ }
+
remainingBytes.addAndGet(bytes);
List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
@@ -345,9 +349,16 @@ public class MemoryPool {
return;
}
- if (planNodeIdToBytesReserved.get(planNodeId) == null
- || planNodeIdToBytesReserved.get(planNodeId) == 0) {
- planNodeIdToBytesReserved.remove(planNodeId);
+ Long newValue =
+ planNodeIdToBytesReserved.computeIfPresent(
+ planNodeId,
+ (k, memoryReserved) -> {
+ if (memoryReserved == 0) {
+ return null;
+ }
+ return memoryReserved;
+ });
+ if (newValue == null) {
instanceBytesReserved.computeIfPresent(
fragmentInstanceId,
(k, kPlanNodeBytesReserved) -> {