You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/10 11:48:49 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 8af48e36d5 [To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool
8af48e36d5 is described below
commit 8af48e36d5957a5f52d82399f08247f58942bbf8
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 10 19:48:43 2023 +0800
[To rel/1.0][IOTDB-5501] Fix memory leak in MemoryPool
---
.../iotdb/db/mpp/common/FragmentInstanceId.java | 6 ++++
.../mpp/execution/exchange/SharedTsBlockQueue.java | 29 ++++++++++++++----
.../db/mpp/execution/exchange/SinkHandle.java | 22 ++++++++++----
.../db/mpp/execution/exchange/SourceHandle.java | 24 +++++++++++----
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 35 ++++++++++++++++------
.../execution/exchange/LocalSinkHandleTest.java | 14 +++++++--
.../db/mpp/execution/exchange/SinkHandleTest.java | 28 +++++++++++------
.../mpp/execution/exchange/SourceHandleTest.java | 11 +++++--
8 files changed, 130 insertions(+), 39 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index e75d264700..070f7524bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -109,4 +109,10 @@ public class FragmentInstanceId {
public static String createFullId(String queryId, int fragmentId, String instanceId) {
return String.format("%s.%d.%s", queryId, fragmentId, instanceId);
}
+
+ public static String createFragmentInstanceIdFromTFragmentInstanceId(
+ TFragmentInstanceId tFragmentInstanceId) {
+ return String.format(
+ "%d.%s", tFragmentInstanceId.getFragmentId(), tFragmentInstanceId.getInstanceId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 95e64b1828..1eca7b7bd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -48,6 +49,8 @@ public class SharedTsBlockQueue {
private final TFragmentInstanceId localFragmentInstanceId;
private final String localPlanNodeId;
+
+ private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private boolean noMoreTsBlocks = false;
@@ -80,6 +83,8 @@ public class SharedTsBlockQueue {
LocalMemoryManager localMemoryManager) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
@@ -154,7 +159,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -181,7 +186,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
maxBytesCanReserve);
@@ -228,11 +233,16 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -253,11 +263,16 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -278,10 +293,14 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 50a3f2e1e8..626c1b5698 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -65,6 +66,7 @@ public class SinkHandle implements ISinkHandle {
private final String localPlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
+ private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
@@ -116,6 +118,8 @@ public class SinkHandle implements ISinkHandle {
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
this.serde = Validate.notNull(serde);
@@ -132,7 +136,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
@@ -171,7 +175,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
@@ -211,11 +215,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onAborted(this);
logger.debug("[EndAbortSinkHandle]");
}
@@ -231,11 +239,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onFinish(this);
logger.debug("[EndCloseSinkHandle]");
}
@@ -315,7 +327,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
freedBytes);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index cda072ff0f..edfa85afcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -61,6 +62,7 @@ public class SourceHandle implements ISourceHandle {
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
+ private final String fullFragmentInstanceId;
private final String localPlanNodeId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
@@ -114,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
@@ -156,7 +160,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSize);
@@ -195,7 +199,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
@@ -297,11 +301,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
aborted = true;
sourceHandleListener.onAborted(this);
}
@@ -330,11 +338,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
closed = true;
currSequenceId = lastSequenceId + 1;
sourceHandleListener.onFinished(this);
@@ -398,7 +410,7 @@ public class SourceHandle implements ISourceHandle {
"Query[%s]-[%s-%s-SourceHandle-%s]",
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getFragmentId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId);
}
@@ -500,7 +512,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..b2970bb22e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -291,14 +291,11 @@ public class MemoryPool {
Validate.isTrue(bytes <= queryReservedBytes);
queryReservedBytes -= bytes;
- if (queryReservedBytes == 0) {
- queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
- } else {
- queryMemoryReservations
- .get(queryId)
- .get(fragmentInstanceId)
- .put(planNodeId, queryReservedBytes);
- }
+ queryMemoryReservations
+ .get(queryId)
+ .get(fragmentInstanceId)
+ .put(planNodeId, queryReservedBytes);
+
reservedBytes -= bytes;
if (memoryReservationFutures.isEmpty()) {
@@ -349,7 +346,7 @@ public class MemoryPool {
future.set(null);
} catch (Throwable t) {
// ignore it, because we still need to notify other future
- LOGGER.error("error happened while trying to free memory: ", t);
+ LOGGER.warn("error happened while trying to free memory: ", t);
}
}
}
@@ -368,4 +365,24 @@ public class MemoryPool {
public long getReservedBytes() {
return reservedBytes;
}
+
+ public synchronized void clearMemoryReservationMap(
+ String queryId, String fragmentInstanceId, String planNodeId) {
+ if (queryMemoryReservations.get(queryId) == null
+ || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) {
+ return;
+ }
+ Map<String, Long> planNodeIdToBytesReserved =
+ queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+ if (planNodeIdToBytesReserved.get(planNodeId) == null
+ || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+ planNodeIdToBytesReserved.remove(planNodeId);
+ if (planNodeIdToBytesReserved.isEmpty()) {
+ queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+ }
+ if (queryMemoryReservations.get(queryId).isEmpty()) {
+ queryMemoryReservations.remove(queryId);
+ }
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 37055a6a24..bac907fa4a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -91,7 +92,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
@@ -107,7 +109,12 @@ public class LocalSinkHandleTest {
Assert.assertFalse(localSinkHandle.isFinished());
Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize);
+ .free(
+ queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
+ remotePlanNodeId,
+ mockTsBlockSize);
// Set no-more-TsBlocks.
localSinkHandle.setNoMoreTsBlocks();
@@ -179,7 +186,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..8c033e2b02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
@@ -318,7 +324,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..ae2c2c3f6d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -233,7 +234,8 @@ public class SourceHandleTest {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
mockTsBlockSize,
5 * mockTsBlockSize);
@@ -263,7 +265,12 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
- .free(queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, mockTsBlockSize);
+ .free(
+ queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
+ localPlanNodeId,
+ mockTsBlockSize);
sourceHandle.receive();
try {
if (i < 5) {