You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/10 11:48:49 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 8af48e36d5 [To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool
8af48e36d5 is described below

commit 8af48e36d5957a5f52d82399f08247f58942bbf8
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 10 19:48:43 2023 +0800

    [To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool
---
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  6 ++++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 29 ++++++++++++++----
 .../db/mpp/execution/exchange/SinkHandle.java      | 22 ++++++++++----
 .../db/mpp/execution/exchange/SourceHandle.java    | 24 +++++++++++----
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 35 ++++++++++++++++------
 .../execution/exchange/LocalSinkHandleTest.java    | 14 +++++++--
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 28 +++++++++++------
 .../mpp/execution/exchange/SourceHandleTest.java   | 11 +++++--
 8 files changed, 130 insertions(+), 39 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index e75d264700..070f7524bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -109,4 +109,10 @@ public class FragmentInstanceId {
   public static String createFullId(String queryId, int fragmentId, String instanceId) {
     return String.format("%s.%d.%s", queryId, fragmentId, instanceId);
   }
+
+  public static String createFragmentInstanceIdFromTFragmentInstanceId(
+      TFragmentInstanceId tFragmentInstanceId) {
+    return String.format(
+        "%d.%s", tFragmentInstanceId.getFragmentId(), tFragmentInstanceId.getInstanceId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 95e64b1828..1eca7b7bd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -48,6 +49,8 @@ public class SharedTsBlockQueue {
   private final TFragmentInstanceId localFragmentInstanceId;
 
   private final String localPlanNodeId;
+
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
 
   private boolean noMoreTsBlocks = false;
@@ -80,6 +83,8 @@ public class SharedTsBlockQueue {
       LocalMemoryManager localMemoryManager) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
@@ -154,7 +159,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .free(
             localFragmentInstanceId.getQueryId(),
-            localFragmentInstanceId.getInstanceId(),
+            fullFragmentInstanceId,
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -181,7 +186,7 @@ public class SharedTsBlockQueue {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getRetainedSizeInBytes(),
                 maxBytesCanReserve);
@@ -228,11 +233,16 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -253,11 +263,16 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -278,10 +293,14 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 50a3f2e1e8..626c1b5698 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -65,6 +66,7 @@ public class SinkHandle implements ISinkHandle {
 
   private final String localPlanNodeId;
   private final TFragmentInstanceId localFragmentInstanceId;
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
   private final TsBlockSerde serde;
@@ -116,6 +118,8 @@ public class SinkHandle implements ISinkHandle {
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
     this.serde = Validate.notNull(serde);
@@ -132,7 +136,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
@@ -171,7 +175,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 retainedSizeInBytes,
                 maxBytesCanReserve)
@@ -211,11 +215,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onAborted(this);
     logger.debug("[EndAbortSinkHandle]");
   }
@@ -231,11 +239,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onFinish(this);
     logger.debug("[EndCloseSinkHandle]");
   }
@@ -315,7 +327,7 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               freedBytes);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index cda072ff0f..edfa85afcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -61,6 +62,7 @@ public class SourceHandle implements ISourceHandle {
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
+  private final String fullFragmentInstanceId;
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
@@ -114,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
@@ -156,7 +160,7 @@ public class SourceHandle implements ISourceHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               retainedSize);
 
@@ -195,7 +199,7 @@ public class SourceHandle implements ISourceHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
                   maxBytesCanReserve);
@@ -297,11 +301,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -330,11 +338,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
       currSequenceId = lastSequenceId + 1;
       sourceHandleListener.onFinished(this);
@@ -398,7 +410,7 @@ public class SourceHandle implements ISourceHandle {
         "Query[%s]-[%s-%s-SourceHandle-%s]",
         localFragmentInstanceId.getQueryId(),
         localFragmentInstanceId.getFragmentId(),
-        localFragmentInstanceId.getInstanceId(),
+        fullFragmentInstanceId,
         localPlanNodeId);
   }
 
@@ -500,7 +512,7 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 reservedBytes);
         sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..b2970bb22e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -291,14 +291,11 @@ public class MemoryPool {
       Validate.isTrue(bytes <= queryReservedBytes);
 
       queryReservedBytes -= bytes;
-      if (queryReservedBytes == 0) {
-        queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
-      } else {
-        queryMemoryReservations
-            .get(queryId)
-            .get(fragmentInstanceId)
-            .put(planNodeId, queryReservedBytes);
-      }
+      queryMemoryReservations
+          .get(queryId)
+          .get(fragmentInstanceId)
+          .put(planNodeId, queryReservedBytes);
+
       reservedBytes -= bytes;
 
       if (memoryReservationFutures.isEmpty()) {
@@ -349,7 +346,7 @@ public class MemoryPool {
         future.set(null);
       } catch (Throwable t) {
         // ignore it, because we still need to notify other future
-        LOGGER.error("error happened while trying to free memory: ", t);
+        LOGGER.warn("error happened while trying to free memory: ", t);
       }
     }
   }
@@ -368,4 +365,24 @@ public class MemoryPool {
   public long getReservedBytes() {
     return reservedBytes;
   }
+
+  public synchronized void clearMemoryReservationMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    if (queryMemoryReservations.get(queryId) == null
+        || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) {
+      return;
+    }
+    Map<String, Long> planNodeIdToBytesReserved =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    if (planNodeIdToBytesReserved.get(planNodeId) == null
+        || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+      planNodeIdToBytesReserved.remove(planNodeId);
+      if (planNodeIdToBytesReserved.isEmpty()) {
+        queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+      }
+      if (queryMemoryReservations.get(queryId).isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 37055a6a24..bac907fa4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -91,7 +92,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
@@ -107,7 +109,12 @@ public class LocalSinkHandleTest {
     Assert.assertFalse(localSinkHandle.isFinished());
     Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize);
+        .free(
+            queryId,
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
+            remotePlanNodeId,
+            mockTsBlockSize);
 
     // Set no-more-TsBlocks.
     localSinkHandle.setNoMoreTsBlocks();
@@ -179,7 +186,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..8c033e2b02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
 
@@ -318,7 +324,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..ae2c2c3f6d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -233,7 +234,8 @@ public class SourceHandleTest {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
           .reserve(
               queryId,
-              localFragmentInstanceId.getInstanceId(),
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
               localPlanNodeId,
               mockTsBlockSize,
               5 * mockTsBlockSize);
@@ -263,7 +265,12 @@ public class SourceHandleTest {
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
-          .free(queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, mockTsBlockSize);
+          .free(
+              queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
+              localPlanNodeId,
+              mockTsBlockSize);
       sourceHandle.receive();
       try {
         if (i < 5) {