You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/01/16 10:35:58 UTC
[iotdb] branch bugfix/iotdb-5424 updated: [IOTDB-5424] Fix FI memory allocation limit calculation
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch bugfix/iotdb-5424
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/bugfix/iotdb-5424 by this push:
new 253b87915c [IOTDB-5424] Fix FI memory allocation limit calculation
253b87915c is described below
commit 253b87915c4844483e92b52306d95206053e13a2
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Jan 16 18:35:36 2023 +0800
[IOTDB-5424] Fix FI memory allocation limit calculation
---
.../java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java | 1 +
.../org/apache/iotdb/commons/conf/CommonConfig.java | 12 ++----------
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 3 ++-
.../db/mpp/execution/exchange/SourceHandleTest.java | 17 +++++++++++++----
4 files changed, 18 insertions(+), 15 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
index 89bbe695a8..6afc4ae73b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
@@ -56,6 +56,7 @@ public class IoTDBMultiDeviceIT {
.setGroupSizeInByte(1024 * 100)
.setMemtableSizeThreshold(1024 * 100)
.setPartitionInterval(100)
+ .setQueryThreadCount(2)
.setCompressor("LZ4");
EnvFactory.getEnv().initClusterEnvironment();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5d24b9f451..50801663c1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.ConsensusProtocolClass;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.loadbalance.LeaderDistributionPolicy;
import org.apache.iotdb.commons.loadbalance.RegionGroupExtensionPolicy;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.commons.wal.WALMode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -625,9 +624,6 @@ public class CommonConfig {
// Default system file storage is in local file system (unsupported)
private FSType systemFileStorageFs = FSType.LOCAL;
- // Max bytes of each FragmentInstance for DataExchange
- private long maxBytesPerFragmentInstance = allocateMemoryForDataExchange / queryThreadCount;
-
private boolean rpcThriftCompressionEnable = false;
private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
// ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
@@ -1712,13 +1708,9 @@ public class CommonConfig {
this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
}
+ /** Get max bytes of each fragmentInstance for DataExchange. */
public long getMaxBytesPerFragmentInstance() {
- return maxBytesPerFragmentInstance;
- }
-
- @TestOnly
- public void setMaxBytesPerFragmentInstance(long maxBytesPerFragmentInstance) {
- this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
+ return allocateMemoryForDataExchange / queryThreadCount;
}
public long getAllocateMemoryForTimeIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..190cf24eb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -156,8 +156,9 @@ public class MemoryPool {
bytesToReserve);
if (bytesToReserve > maxBytesCanReserve) {
LOGGER.warn(
- "Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
+ "Cannot reserve {}(Max: {}) bytes memory from MemoryPool for planNodeId{}",
bytesToReserve,
+ maxBytesCanReserve,
planNodeId);
throw new IllegalArgumentException(
"Query is aborted since it requests more memory than can be allocated.");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 5bd246f59d..c1016b415b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -52,17 +52,26 @@ import java.util.stream.Stream;
public class SourceHandleTest {
private static final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
- private static long maxBytesPerFI;
+ private static int queryThreadCount;
+ private static long allocateMemoryForDataExchange;
@BeforeClass
public static void beforeClass() {
- maxBytesPerFI = CommonDescriptor.getInstance().getConf().getMaxBytesPerFragmentInstance();
- CommonDescriptor.getInstance().getConf().setMaxBytesPerFragmentInstance(5 * MOCK_TSBLOCK_SIZE);
+ queryThreadCount = CommonDescriptor.getInstance().getConf().getQueryThreadCount();
+ allocateMemoryForDataExchange =
+ CommonDescriptor.getInstance().getConf().getAllocateMemoryForDataExchange();
+ CommonDescriptor.getInstance().getConf().setQueryThreadCount(2);
+ CommonDescriptor.getInstance()
+ .getConf()
+ .setAllocateMemoryForDataExchange(5 * MOCK_TSBLOCK_SIZE * 2);
}
@AfterClass
public static void afterClass() {
- CommonDescriptor.getInstance().getConf().setMaxBytesPerFragmentInstance(maxBytesPerFI);
+ CommonDescriptor.getInstance().getConf().setQueryThreadCount(queryThreadCount);
+ CommonDescriptor.getInstance()
+ .getConf()
+ .setAllocateMemoryForDataExchange(allocateMemoryForDataExchange);
}
@Test