You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/01/16 10:35:58 UTC

[iotdb] branch bugfix/iotdb-5424 updated: [IOTDB-5424] Fix FI memory allocation limit calculation

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-5424
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/bugfix/iotdb-5424 by this push:
     new 253b87915c [IOTDB-5424] Fix FI memory allocation limit calculation
253b87915c is described below

commit 253b87915c4844483e92b52306d95206053e13a2
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Jan 16 18:35:36 2023 +0800

    [IOTDB-5424] Fix FI memory allocation limit calculation
---
 .../java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java |  1 +
 .../org/apache/iotdb/commons/conf/CommonConfig.java     | 12 ++----------
 .../iotdb/db/mpp/execution/memory/MemoryPool.java       |  3 ++-
 .../db/mpp/execution/exchange/SourceHandleTest.java     | 17 +++++++++++++----
 4 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
index 89bbe695a8..6afc4ae73b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
@@ -56,6 +56,7 @@ public class IoTDBMultiDeviceIT {
         .setGroupSizeInByte(1024 * 100)
         .setMemtableSizeThreshold(1024 * 100)
         .setPartitionInterval(100)
+        .setQueryThreadCount(2)
         .setCompressor("LZ4");
 
     EnvFactory.getEnv().initClusterEnvironment();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5d24b9f451..50801663c1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.consensus.ConsensusProtocolClass;
 import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
 import org.apache.iotdb.commons.loadbalance.LeaderDistributionPolicy;
 import org.apache.iotdb.commons.loadbalance.RegionGroupExtensionPolicy;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.datastructure.TVListSortAlgorithm;
 import org.apache.iotdb.commons.wal.WALMode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -625,9 +624,6 @@ public class CommonConfig {
   // Default system file storage is in local file system (unsupported)
   private FSType systemFileStorageFs = FSType.LOCAL;
 
-  // Max bytes of each FragmentInstance for DataExchange
-  private long maxBytesPerFragmentInstance = allocateMemoryForDataExchange / queryThreadCount;
-
   private boolean rpcThriftCompressionEnable = false;
   private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
   // ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
@@ -1712,13 +1708,9 @@ public class CommonConfig {
     this.allocateMemoryForDataExchange = allocateMemoryForDataExchange;
   }
 
+  /** Get max bytes of each fragmentInstance for DataExchange. */
   public long getMaxBytesPerFragmentInstance() {
-    return maxBytesPerFragmentInstance;
-  }
-
-  @TestOnly
-  public void setMaxBytesPerFragmentInstance(long maxBytesPerFragmentInstance) {
-    this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
+    return allocateMemoryForDataExchange / queryThreadCount;
   }
 
   public long getAllocateMemoryForTimeIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..190cf24eb0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -156,8 +156,9 @@ public class MemoryPool {
         bytesToReserve);
     if (bytesToReserve > maxBytesCanReserve) {
       LOGGER.warn(
-          "Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
+          "Cannot reserve {}(Max: {}) bytes memory from MemoryPool for planNodeId{}",
           bytesToReserve,
+          maxBytesCanReserve,
           planNodeId);
       throw new IllegalArgumentException(
           "Query is aborted since it requests more memory than can be allocated.");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 5bd246f59d..c1016b415b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -52,17 +52,26 @@ import java.util.stream.Stream;
 public class SourceHandleTest {
   private static final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
 
-  private static long maxBytesPerFI;
+  private static int queryThreadCount;
+  private static long allocateMemoryForDataExchange;
 
   @BeforeClass
   public static void beforeClass() {
-    maxBytesPerFI = CommonDescriptor.getInstance().getConf().getMaxBytesPerFragmentInstance();
-    CommonDescriptor.getInstance().getConf().setMaxBytesPerFragmentInstance(5 * MOCK_TSBLOCK_SIZE);
+    queryThreadCount = CommonDescriptor.getInstance().getConf().getQueryThreadCount();
+    allocateMemoryForDataExchange =
+        CommonDescriptor.getInstance().getConf().getAllocateMemoryForDataExchange();
+    CommonDescriptor.getInstance().getConf().setQueryThreadCount(2);
+    CommonDescriptor.getInstance()
+        .getConf()
+        .setAllocateMemoryForDataExchange(5 * MOCK_TSBLOCK_SIZE * 2);
   }
 
   @AfterClass
   public static void afterClass() {
-    CommonDescriptor.getInstance().getConf().setMaxBytesPerFragmentInstance(maxBytesPerFI);
+    CommonDescriptor.getInstance().getConf().setQueryThreadCount(queryThreadCount);
+    CommonDescriptor.getInstance()
+        .getConf()
+        .setAllocateMemoryForDataExchange(allocateMemoryForDataExchange);
   }
 
   @Test