You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/13 03:51:55 UTC

[iotdb] 02/02: [IOTDB-5501] Fix memory leak in MemoryPool

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch changcheng_0213
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43056b51a26ac8477a70e9f51e1596d5ca8390e3
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 10 11:02:52 2023 +0800

    [IOTDB-5501] Fix memory leak in MemoryPool
---
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  7 ++++-
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 ++++++++++----
 .../db/mpp/execution/exchange/SinkHandle.java      | 23 ++++++++++----
 .../db/mpp/execution/exchange/SourceHandle.java    | 25 ++++++++++++----
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 35 ++++++++++++++++------
 .../execution/exchange/LocalSinkHandleTest.java    | 14 +++++++--
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 28 +++++++++++------
 .../mpp/execution/exchange/SourceHandleTest.java   | 10 +++++--
 8 files changed, 125 insertions(+), 40 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index b1c12bb729..793066b94a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -107,6 +107,11 @@ public class FragmentInstanceId {
   }
 
   public static String createFullId(String queryId, int fragmentId, String instanceId) {
-    return queryId + '.' + fragmentId + '.' + instanceId;
+    return queryId + "." + fragmentId + "." + instanceId;
+  }
+
+  public static String createFragmentInstanceIdFromTFragmentInstanceId(
+      TFragmentInstanceId tFragmentInstanceId) {
+    return tFragmentInstanceId.getFragmentId() + "." + tFragmentInstanceId.getInstanceId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index ade913aebc..a40202de46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -49,6 +50,8 @@ public class SharedTsBlockQueue {
 
   private final String localPlanNodeId;
 
+  private final String fullFragmentInstanceId;
+
   private final LocalMemoryManager localMemoryManager;
 
   private boolean noMoreTsBlocks = false;
@@ -81,6 +84,8 @@ public class SharedTsBlockQueue {
       LocalMemoryManager localMemoryManager) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be null");
@@ -159,7 +164,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .free(
             localFragmentInstanceId.getQueryId(),
-            localFragmentInstanceId.getInstanceId(),
+            fullFragmentInstanceId,
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -186,7 +191,7 @@ public class SharedTsBlockQueue {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getRetainedSizeInBytes(),
                 maxBytesCanReserve);
@@ -233,11 +238,15 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -258,11 +267,15 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -283,7 +296,7 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 139387d510..fbc36ed34d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -69,6 +70,8 @@ public class SinkHandle implements ISinkHandle {
 
   private final String localPlanNodeId;
   private final TFragmentInstanceId localFragmentInstanceId;
+
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
   private final TsBlockSerde serde;
@@ -122,6 +125,8 @@ public class SinkHandle implements ISinkHandle {
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
     this.serde = Validate.notNull(serde);
@@ -138,7 +143,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
@@ -179,7 +184,7 @@ public class SinkHandle implements ISinkHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   retainedSizeInBytes,
                   maxBytesCanReserve)
@@ -223,11 +228,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onAborted(this);
     logger.debug("[EndAbortSinkHandle]");
   }
@@ -243,11 +252,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
     sinkHandleListener.onFinish(this);
     logger.debug("[EndCloseSinkHandle]");
   }
@@ -327,7 +340,7 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               freedBytes);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index d164e9d1de..386fdd40b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -68,6 +69,8 @@ public class SourceHandle implements ISourceHandle {
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
+
+  private final String fullFragmentInstanceId;
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
@@ -123,6 +126,8 @@ public class SourceHandle implements ISourceHandle {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
@@ -172,7 +177,7 @@ public class SourceHandle implements ISourceHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               retainedSize);
 
@@ -214,7 +219,7 @@ public class SourceHandle implements ISourceHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
                   maxBytesCanReserve);
@@ -316,11 +321,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -349,11 +358,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
       closed = true;
       currSequenceId = lastSequenceId + 1;
       sourceHandleListener.onFinished(this);
@@ -417,7 +430,7 @@ public class SourceHandle implements ISourceHandle {
         "Query[%s]-[%s-%s-SourceHandle-%s]",
         localFragmentInstanceId.getQueryId(),
         localFragmentInstanceId.getFragmentId(),
-        localFragmentInstanceId.getInstanceId(),
+        fullFragmentInstanceId,
         localPlanNodeId);
   }
 
@@ -527,7 +540,7 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 reservedBytes);
         sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 190cf24eb0..6972111810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -292,14 +292,11 @@ public class MemoryPool {
       Validate.isTrue(bytes <= queryReservedBytes);
 
       queryReservedBytes -= bytes;
-      if (queryReservedBytes == 0) {
-        queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
-      } else {
-        queryMemoryReservations
-            .get(queryId)
-            .get(fragmentInstanceId)
-            .put(planNodeId, queryReservedBytes);
-      }
+      queryMemoryReservations
+          .get(queryId)
+          .get(fragmentInstanceId)
+          .put(planNodeId, queryReservedBytes);
+
       reservedBytes -= bytes;
 
       if (memoryReservationFutures.isEmpty()) {
@@ -350,7 +347,7 @@ public class MemoryPool {
         future.set(null);
       } catch (Throwable t) {
         // ignore it, because we still need to notify other future
-        LOGGER.error("error happened while trying to free memory: ", t);
+        LOGGER.warn("error happened while trying to free memory: ", t);
       }
     }
   }
@@ -369,4 +366,24 @@ public class MemoryPool {
   public long getReservedBytes() {
     return reservedBytes;
   }
+
+  public void clearMemoryReservationMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    if (queryMemoryReservations.get(queryId) == null
+        || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) {
+      return;
+    }
+    Map<String, Long> planNodeIdToBytesReserved =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    if (planNodeIdToBytesReserved.get(planNodeId) == null
+        || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+      planNodeIdToBytesReserved.remove(planNodeId);
+      if (planNodeIdToBytesReserved.isEmpty()) {
+        queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+      }
+      if (queryMemoryReservations.get(queryId).isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 0a022ae06b..8bcb1b300b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -86,7 +87,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
@@ -102,7 +104,12 @@ public class LocalSinkHandleTest {
     Assert.assertFalse(localSinkHandle.isFinished());
     Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize);
+        .free(
+            queryId,
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
+            remotePlanNodeId,
+            mockTsBlockSize);
 
     // Set no-more-TsBlocks.
     localSinkHandle.setNoMoreTsBlocks();
@@ -169,7 +176,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..d50e69dd2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
 
@@ -318,7 +324,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //
+    // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 1a5ad7560f..d0b8f49f29 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -252,7 +253,8 @@ public class SourceHandleTest {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
           .reserve(
               queryId,
-              localFragmentInstanceId.getInstanceId(),
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
               localPlanNodeId,
               MOCK_TSBLOCK_SIZE,
               maxBytesCanReserve);
@@ -283,7 +285,11 @@ public class SourceHandleTest {
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
           .free(
-              queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, MOCK_TSBLOCK_SIZE);
+              queryId,
+              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
+              localPlanNodeId,
+              MOCK_TSBLOCK_SIZE);
       sourceHandle.receive();
       try {
         if (i < 5) {