You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/07 00:35:10 UTC

[iotdb] 03/03: [To rel/1.1][IOTDB-5620] Fix flush stuck when there is a lot of time partitions in each DataRegion (#9225)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rc/1.1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32f9db010848cbbc251b1fecbc68f4b3e5404420
Author: Haonan <hh...@outlook.com>
AuthorDate: Mon Mar 6 20:42:15 2023 +0800

    [To rel/1.1][IOTDB-5620] Fix flush stuck when there is a lot of time partitions in each DataRegion (#9225)
---
 .../iotdb/it/env/cluster/MppCommonConfig.java      |  6 ++
 .../it/env/cluster/MppSharedCommonConfig.java      |  7 +++
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |  5 ++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |  2 +
 .../iotdb/db/it/IoTDBInsertMultiPartitionIT.java   | 71 ++++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  1 -
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  2 +-
 .../engine/storagegroup/TimePartitionManager.java  |  9 ++-
 8 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index c1bcbde4ec..fe9f3a6767 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -334,4 +334,10 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
     setProperty("schema_memory_allocate_proportion", String.valueOf(schemaMemoryAllocate));
     return this;
   }
+
+  @Override
+  public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+    setProperty("write_memory_proportion", writeMemoryProportion);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index ddaba5a331..275ed1f8f8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -343,4 +343,11 @@ public class MppSharedCommonConfig implements CommonConfig {
     cnConfig.setSchemaMemoryAllocate(schemaMemoryAllocate);
     return this;
   }
+
+  @Override
+  public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+    dnConfig.setWriteMemoryProportion(writeMemoryProportion);
+    cnConfig.setWriteMemoryProportion(writeMemoryProportion);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 440b38253e..526df5dbab 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -246,4 +246,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate) {
     return this;
   }
+
+  @Override
+  public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 59dcd778ab..4792b8b3e7 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -110,4 +110,6 @@ public interface CommonConfig {
   CommonConfig setSeriesSlotNum(int seriesSlotNum);
 
   CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
+
+  CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java
new file mode 100644
index 0000000000..fac1175f74
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBInsertMultiPartitionIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.category.RemoteIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class, RemoteIT.class})
+public class IoTDBInsertMultiPartitionIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeCommonConfig()
+        .setWriteMemoryProportion("10000000:1");
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testInsertMultiPartition() {
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.sg.d1(time,s1) values(1,2)");
+      statement.execute("flush");
+      statement.execute("insert into root.sg.d1(time,s1) values(2,2)");
+      statement.execute("insert into root.sg.d1(time,s1) values(604800001,2)");
+      statement.execute("flush");
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75a461b006..70a0b0f3da 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1785,7 +1785,6 @@ public class IoTDBConfig {
 
   public void setAllocateMemoryForStorageEngine(long allocateMemoryForStorageEngine) {
     this.allocateMemoryForStorageEngine = allocateMemoryForStorageEngine;
-    this.allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
   }
 
   public long getAllocateMemoryForSchema() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index a4c40333ab..0432754ab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1612,7 +1612,7 @@ public class IoTDBDescriptor {
     double memtableProportionForWrite =
         ((double) (proportionForMemTable)
             / (double) (proportionForMemTable + proportionForTimePartitionInfo));
-    Double.parseDouble(properties.getProperty("flush_time_memory_proportion", "0.05"));
+
     double timePartitionInfoForWrite =
         ((double) (proportionForTimePartitionInfo)
             / (double) (proportionForMemTable + proportionForTimePartitionInfo));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
index 1f77b36b82..640f8dfb95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TimePartitionManager.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -31,6 +34,7 @@ import java.util.TreeSet;
 
 /** Manage all the time partitions for all data regions and control the total memory of them */
 public class TimePartitionManager {
+  private static final Logger logger = LoggerFactory.getLogger(TimePartitionManager.class);
   final Map<DataRegionId, Map<Long, TimePartitionInfo>> timePartitionInfoMap;
 
   long memCost = 0;
@@ -102,7 +106,10 @@ public class TimePartitionManager {
       }
 
       while (memCost > timePartitionInfoMemoryThreshold) {
-        TimePartitionInfo timePartitionInfo = treeSet.first();
+        TimePartitionInfo timePartitionInfo = treeSet.pollFirst();
+        if (timePartitionInfo == null) {
+          return;
+        }
         memCost -= timePartitionInfo.memSize;
         DataRegion dataRegion =
             StorageEngine.getInstance().getDataRegion(timePartitionInfo.dataRegionId);