You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/13 03:51:55 UTC
[iotdb] 02/02: [IOTDB-5501] Fix memory leak in MemoryPool
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch changcheng_0213
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 43056b51a26ac8477a70e9f51e1596d5ca8390e3
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Feb 10 11:02:52 2023 +0800
[IOTDB-5501] Fix memory leak in MemoryPool
---
.../iotdb/db/mpp/common/FragmentInstanceId.java | 7 ++++-
.../mpp/execution/exchange/SharedTsBlockQueue.java | 23 ++++++++++----
.../db/mpp/execution/exchange/SinkHandle.java | 23 ++++++++++----
.../db/mpp/execution/exchange/SourceHandle.java | 25 ++++++++++++----
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 35 ++++++++++++++++------
.../execution/exchange/LocalSinkHandleTest.java | 14 +++++++--
.../db/mpp/execution/exchange/SinkHandleTest.java | 28 +++++++++++------
.../mpp/execution/exchange/SourceHandleTest.java | 10 +++++--
8 files changed, 125 insertions(+), 40 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index b1c12bb729..793066b94a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -107,6 +107,11 @@ public class FragmentInstanceId {
}
public static String createFullId(String queryId, int fragmentId, String instanceId) {
- return queryId + '.' + fragmentId + '.' + instanceId;
+ return queryId + "." + fragmentId + "." + instanceId;
+ }
+
+ public static String createFragmentInstanceIdFromTFragmentInstanceId(
+ TFragmentInstanceId tFragmentInstanceId) {
+ return tFragmentInstanceId.getFragmentId() + "." + tFragmentInstanceId.getInstanceId();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index ade913aebc..a40202de46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -49,6 +50,8 @@ public class SharedTsBlockQueue {
private final String localPlanNodeId;
+ private final String fullFragmentInstanceId;
+
private final LocalMemoryManager localMemoryManager;
private boolean noMoreTsBlocks = false;
@@ -81,6 +84,8 @@ public class SharedTsBlockQueue {
LocalMemoryManager localMemoryManager) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
@@ -159,7 +164,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -186,7 +191,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
maxBytesCanReserve);
@@ -233,11 +238,15 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -258,11 +267,15 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
@@ -283,7 +296,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 139387d510..fbc36ed34d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -69,6 +70,8 @@ public class SinkHandle implements ISinkHandle {
private final String localPlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
+
+ private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
@@ -122,6 +125,8 @@ public class SinkHandle implements ISinkHandle {
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
this.serde = Validate.notNull(serde);
@@ -138,7 +143,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after
@@ -179,7 +184,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
@@ -223,11 +228,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onAborted(this);
logger.debug("[EndAbortSinkHandle]");
}
@@ -243,11 +252,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
sinkHandleListener.onFinish(this);
logger.debug("[EndCloseSinkHandle]");
}
@@ -327,7 +340,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
freedBytes);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index d164e9d1de..386fdd40b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
@@ -68,6 +69,8 @@ public class SourceHandle implements ISourceHandle {
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
+
+ private final String fullFragmentInstanceId;
private final String localPlanNodeId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
@@ -123,6 +126,8 @@ public class SourceHandle implements ISourceHandle {
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
@@ -172,7 +177,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSize);
@@ -214,7 +219,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
@@ -316,11 +321,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
aborted = true;
sourceHandleListener.onAborted(this);
}
@@ -349,11 +358,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId);
closed = true;
currSequenceId = lastSequenceId + 1;
sourceHandleListener.onFinished(this);
@@ -417,7 +430,7 @@ public class SourceHandle implements ISourceHandle {
"Query[%s]-[%s-%s-SourceHandle-%s]",
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getFragmentId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId);
}
@@ -527,7 +540,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index 190cf24eb0..6972111810 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -292,14 +292,11 @@ public class MemoryPool {
Validate.isTrue(bytes <= queryReservedBytes);
queryReservedBytes -= bytes;
- if (queryReservedBytes == 0) {
- queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
- } else {
- queryMemoryReservations
- .get(queryId)
- .get(fragmentInstanceId)
- .put(planNodeId, queryReservedBytes);
- }
+ queryMemoryReservations
+ .get(queryId)
+ .get(fragmentInstanceId)
+ .put(planNodeId, queryReservedBytes);
+
reservedBytes -= bytes;
if (memoryReservationFutures.isEmpty()) {
@@ -350,7 +347,7 @@ public class MemoryPool {
future.set(null);
} catch (Throwable t) {
// ignore it, because we still need to notify other future
- LOGGER.error("error happened while trying to free memory: ", t);
+ LOGGER.warn("error happened while trying to free memory: ", t);
}
}
}
@@ -369,4 +366,24 @@ public class MemoryPool {
public long getReservedBytes() {
return reservedBytes;
}
+
+ public void clearMemoryReservationMap(
+ String queryId, String fragmentInstanceId, String planNodeId) {
+ if (queryMemoryReservations.get(queryId) == null
+ || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) {
+ return;
+ }
+ Map<String, Long> planNodeIdToBytesReserved =
+ queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+ if (planNodeIdToBytesReserved.get(planNodeId) == null
+ || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+ planNodeIdToBytesReserved.remove(planNodeId);
+ if (planNodeIdToBytesReserved.isEmpty()) {
+ queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+ }
+ if (queryMemoryReservations.get(queryId).isEmpty()) {
+ queryMemoryReservations.remove(queryId);
+ }
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 0a022ae06b..8bcb1b300b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -86,7 +87,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
@@ -102,7 +104,12 @@ public class LocalSinkHandleTest {
Assert.assertFalse(localSinkHandle.isFinished());
Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize);
+ .free(
+ queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
+ remotePlanNodeId,
+ mockTsBlockSize);
// Set no-more-TsBlocks.
localSinkHandle.setNoMoreTsBlocks();
@@ -169,7 +176,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..d50e69dd2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
@@ -318,7 +324,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
+ // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 1a5ad7560f..d0b8f49f29 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -252,7 +253,8 @@ public class SourceHandleTest {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
MOCK_TSBLOCK_SIZE,
maxBytesCanReserve);
@@ -283,7 +285,11 @@ public class SourceHandleTest {
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
.free(
- queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, MOCK_TSBLOCK_SIZE);
+ queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
+ localPlanNodeId,
+ MOCK_TSBLOCK_SIZE);
sourceHandle.receive();
try {
if (i < 5) {