You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/23 05:04:46 UTC

[iotdb] 01/02: Reduce the scope of lock in MemoryPool

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch concurrentBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16917f89ea1736bf0204d06e7b1b67c86dcc481c
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 23 13:03:53 2023 +0800

    Reduce the scope of lock in MemoryPool
---
 .../db/exception/runtime/MemoryLeakException.java  | 27 +++++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  4 +
 .../execution/exchange/MPPDataExchangeManager.java |  6 ++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 15 ++--
 .../mpp/execution/exchange/sink/SinkChannel.java   |  8 --
 .../execution/exchange/source/SourceHandle.java    |  8 --
 .../fragment/FragmentInstanceExecution.java        |  5 ++
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 93 +++++++++++-----------
 8 files changed, 92 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java
new file mode 100644
index 0000000000..beb9a83951
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.runtime;
+
+public class MemoryLeakException extends RuntimeException {
+
+  public MemoryLeakException(String message) {
+    super(message);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 793066b94a..24164c4e89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -57,6 +57,10 @@ public class FragmentInstanceId {
     return instanceId;
   }
 
+  public String getFragmentInstanceId() {
+    return fragmentId + "." + instanceId;
+  }
+
   public String toString() {
     return fullId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 501781b408..9a58904c79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -470,6 +470,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
     return mppDataExchangeService;
   }
 
+  public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) {
+    localMemoryManager
+        .getQueryPool()
+        .deRegisterFragmentInstanceToQueryMemoryMap(queryId, fragmentInstanceId);
+  }
+
   private synchronized ISinkChannel createLocalSinkChannel(
       TFragmentInstanceId localFragmentInstanceId,
       TFragmentInstanceId remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 53d668cc86..e496ac98e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -30,12 +30,11 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 import java.util.LinkedList;
 import java.util.Queue;
 
@@ -91,6 +90,10 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
+    localMemoryManager
+        .getQueryPool()
+        .registerPlanNodeIdToQueryMemoryMap(
+            fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId);
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -260,10 +263,6 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -289,10 +288,6 @@ public class SharedTsBlockQueue {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 5cd28de462..b316e2bbd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -222,10 +222,6 @@ public class SinkChannel implements ISinkChannel {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkListener.onAborted(this);
     aborted = true;
     LOGGER.debug("[EndAbortSinkChannel]");
@@ -249,10 +245,6 @@ public class SinkChannel implements ISinkChannel {
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
-    localMemoryManager
-        .getQueryPool()
-        .clearMemoryReservationMap(
-            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkListener.onFinish(this);
     closed = true;
     LOGGER.debug("[EndCloseSinkChannel]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index b66dd4bce4..5d118d310c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -332,10 +332,6 @@ public class SourceHandle implements ISourceHandle {
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
-      localMemoryManager
-          .getQueryPool()
-          .clearMemoryReservationMap(
-              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -369,10 +365,6 @@ public class SourceHandle implements ISourceHandle {
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
-      localMemoryManager
-          .getQueryPool()
-          .clearMemoryReservationMap(
-              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
       executorService.submit(new SendCloseSinkChannelEventTask());
       currSequenceId = lastSequenceId + 1;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index f9c17c9963..b402f0892e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.driver.IDriver;
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -137,6 +138,10 @@ public class FragmentInstanceExecution {
             context.releaseResource();
             // help for gc
             drivers = null;
+            MPPDataExchangeService.getInstance()
+                .getMPPDataExchangeManager()
+                .deRegisterFragmentInstanceFromMemoryPool(
+                    instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 37dfc13d9a..a9acf90e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -20,19 +20,18 @@
 package org.apache.iotdb.db.mpp.execution.memory;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.Validate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -149,6 +148,44 @@ public class MemoryPool {
     return id;
   }
 
+  /**
+   * Before executing, we register memory map which is related to queryId, fragmentInstanceId, and
+   * planNodeId to queryMemoryReservationsMap first.
+   */
+  public synchronized void registerPlanNodeIdToQueryMemoryMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    queryMemoryReservations
+        .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
+        .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+        .putIfAbsent(planNodeId, 0L);
+  }
+
+  /**
+   * If all fragmentInstanceIds related to one queryId have been registered, when the last fragment
+   * instance is deregister, the queryId can be cleared.
+   *
+   * <p>If some fragmentInstanceIds have not been registered when queryId is cleared, they will
+   * register queryId again with lock, so there is no concurrency problem.
+   */
+  public void deRegisterFragmentInstanceToQueryMemoryMap(
+      String queryId, String fragmentInstanceId) {
+    Map<String, Long> planNodeRelatedMemory =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    for (Long memoryReserved : planNodeRelatedMemory.values()) {
+      if (memoryReserved != 0) {
+        throw new MemoryLeakException(
+            "PlanNode related memory is not zero when deregister fragment instance from query memory pool.");
+      }
+    }
+    synchronized (queryMemoryReservations) {
+      Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
+      queryRelatedMemory.remove(fragmentInstanceId);
+      if (queryRelatedMemory.isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
+
   /**
    * Reserve memory with bytesToReserve.
    *
@@ -338,47 +375,7 @@ public class MemoryPool {
     return maxBytes - remainingBytes.get();
   }
 
-  public void clearMemoryReservationMap(
-      String queryId, String fragmentInstanceId, String planNodeId) {
-    Map<String, Map<String, Long>> instanceBytesReserved = queryMemoryReservations.get(queryId);
-    Map<String, Long> planNodeIdToBytesReserved =
-        queryMemoryReservations
-            .getOrDefault(queryId, Collections.emptyMap())
-            .get(fragmentInstanceId);
-    if (instanceBytesReserved == null || planNodeIdToBytesReserved == null) {
-      return;
-    }
-
-    Long newValue =
-        planNodeIdToBytesReserved.computeIfPresent(
-            planNodeId,
-            (k, memoryReserved) -> {
-              if (memoryReserved == 0) {
-                return null;
-              }
-              return memoryReserved;
-            });
-    if (newValue == null) {
-      instanceBytesReserved.computeIfPresent(
-          fragmentInstanceId,
-          (k, kPlanNodeBytesReserved) -> {
-            if (kPlanNodeBytesReserved.isEmpty()) {
-              return null;
-            }
-            return kPlanNodeBytesReserved;
-          });
-      queryMemoryReservations.computeIfPresent(
-          queryId,
-          (k, kInstanceBytesReserved) -> {
-            if (kInstanceBytesReserved.isEmpty()) {
-              return null;
-            }
-            return kInstanceBytesReserved;
-          });
-    }
-  }
-
-  private boolean tryReserve(
+  public boolean tryReserve(
       String queryId,
       String fragmentInstanceId,
       String planNodeId,
@@ -388,8 +385,8 @@ public class MemoryPool {
     long queryRemainingBytes =
         maxBytesCanReserve
             - queryMemoryReservations
-                .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
-                .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+                .get(queryId)
+                .get(fragmentInstanceId)
                 .merge(planNodeId, bytesToReserve, Long::sum);
     return tryRemainingBytes >= 0 && queryRemainingBytes >= 0;
   }
@@ -397,8 +394,8 @@ public class MemoryPool {
   private void rollbackReserve(
       String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve) {
     queryMemoryReservations
-        .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
-        .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
+        .get(queryId)
+        .get(fragmentInstanceId)
         .merge(planNodeId, -bytesToReserve, Long::sum);
     remainingBytes.addAndGet(bytesToReserve);
   }